4.6 KiB
4.6 KiB
Exercise 8.6 - Solution
(a) Receiving Messages
# coticker.py
from structure import Structure
from validate import String, Integer, Float
class Ticker(Structure):
= String()
name = Float()
price = String()
date = String()
time = Float()
change open = Float()
= Float()
high = Float()
low = Integer()
volume
from cofollow import consumer, follow, receive
from tableformat import create_formatter
import csv
@consumer
def to_csv(target):
def producer():
while True:
yield line
= csv.reader(producer())
reader while True:
= yield from receive(str)
line next(reader))
target.send(
@consumer
def create_ticker(target):
while True:
= yield from receive(list)
row
target.send(Ticker.from_row(row))
@consumer
def negchange(target):
while True:
= yield from receive(Ticker)
record if record.change < 0:
target.send(record)
@consumer
def ticker(fmt, fields):
= create_formatter('text')
formatter
formatter.headings(fields)while True:
= yield from receive(Ticker)
rec = [getattr(rec, name) for name in fields]
row
formatter.row(row)
if __name__ == '__main__':
'Data/stocklog.csv',
follow(
to_csv(
create_ticker(
negchange('text', ['name','price','change']))))) ticker(
(b) Wrapping a Socket
# server.py
...
class GenSocket:
def __init__(self, sock):
self.sock = sock
def accept(self):
yield 'recv', self.sock
= self.sock.accept()
client, addr return GenSocket(client), addr
def recv(self, maxsize):
yield 'recv', self.sock
return self.sock.recv(maxsize)
def send(self, data):
yield 'send', self.sock
return self.sock.send(data)
def __getattr__(self, name):
return getattr(self.sock, name)
def tcp_server(address, handler):
= GenSocket(socket(AF_INET, SOCK_STREAM))
sock 1)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR,
sock.bind(address)5)
sock.listen(while True:
= yield from sock.accept()
client, addr
tasks.append(handler(client, addr))
def echo_handler(client, address):
print('Connection from', address)
while True:
= yield from client.recv(1000)
data if not data:
break
yield from client.send(b'GOT:' + data)
print('Connection closed')
if __name__ == '__main__':
'',25000), echo_handler))
tasks.append(tcp_server(( run()
(c) Async/Await
# server.py
from socket import *
from select import select
from collections import deque
from types import coroutine
= deque()
tasks = {} # sock -> task
recv_wait = {} # sock -> task
send_wait
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
= select(recv_wait, send_wait, [])
can_recv, can_send, _ for s in can_recv:
tasks.append(recv_wait.pop(s))for s in can_send:
tasks.append(send_wait.pop(s))= tasks.popleft()
task try:
= task.send(None)
reason, resource if reason == 'recv':
= task
recv_wait[resource] elif reason == 'send':
= task
send_wait[resource] else:
raise RuntimeError('Unknown reason %r' % reason)
except StopIteration:
print('Task done')
class GenSocket:
def __init__(self, sock):
self.sock = sock
@coroutine
def accept(self):
yield 'recv', self.sock
= self.sock.accept()
client, addr return GenSocket(client), addr
@coroutine
def recv(self, maxsize):
yield 'recv', self.sock
return self.sock.recv(maxsize)
@coroutine
def send(self, data):
yield 'send', self.sock
return self.sock.send(data)
def __getattr__(self, name):
return getattr(self.sock, name)
async def tcp_server(address, handler):
= GenSocket(socket(AF_INET, SOCK_STREAM))
sock 1)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR,
sock.bind(address)5)
sock.listen(while True:
= await sock.accept()
client, addr
tasks.append(handler(client, addr))
async def echo_handler(client, address):
print('Connection from', address)
while True:
= await client.recv(1000)
data if not data:
break
await client.send(b'GOT:' + data)
print('Connection closed')
if __name__ == '__main__':
'',25000), echo_handler))
tasks.append(tcp_server(( run()