files for chapter 18B - async/await

This commit is contained in:
Luciano Ramalho 2015-07-13 22:54:31 -03:00
parent 5e3d37936e
commit daa096e3d2
10 changed files with 662 additions and 0 deletions

View File

@ -0,0 +1,4 @@
Sample code for Chapter 18 - "Concurrency with asyncio"
From the book "Fluent Python" by Luciano Ramalho (O'Reilly, 2015)
http://shop.oreilly.com/product/0636920032519.do

View File

@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""
Unicode character finder utility:
find characters based on words in their official names.
This can be used from the command line, just pass words as arguments.
Here is the ``main`` function which makes it happen::
>>> main('rook') # doctest: +NORMALIZE_WHITESPACE
U+2656 WHITE CHESS ROOK
U+265C BLACK CHESS ROOK
(2 matches for 'rook')
>>> main('rook', 'black') # doctest: +NORMALIZE_WHITESPACE
U+265C BLACK CHESS ROOK
(1 match for 'rook black')
>>> main('white bishop') # doctest: +NORMALIZE_WHITESPACE
U+2657 WHITE CHESS BISHOP
(1 match for 'white bishop')
>>> main("jabberwocky's vest")
(No match for "jabberwocky's vest")
For exploring words that occur in the character names, there is the
``word_report`` function::
>>> index = UnicodeNameIndex(sample_chars)
>>> index.word_report()
3 SIGN
2 A
2 EURO
2 LATIN
2 LETTER
1 CAPITAL
1 CURRENCY
1 DOLLAR
1 SMALL
>>> index = UnicodeNameIndex()
>>> index.word_report(10)
75821 CJK
75761 IDEOGRAPH
74656 UNIFIED
13196 SYLLABLE
11735 HANGUL
7616 LETTER
2232 WITH
2180 SIGN
2122 SMALL
1709 CAPITAL
Note: characters with names starting with 'CJK UNIFIED IDEOGRAPH'
are indexed with those three words only, excluding the hexadecimal
codepoint at the end of the name.
"""
import sys
import re
import unicodedata
import pickle
import warnings
import itertools
import functools
from collections import namedtuple
RE_WORD = re.compile('\w+')
RE_UNICODE_NAME = re.compile('^[A-Z0-9 -]+$')
RE_CODEPOINT = re.compile('U\+([0-9A-F]{4,6})')
INDEX_NAME = 'charfinder_index.pickle'
MINIMUM_SAVE_LEN = 10000
CJK_UNI_PREFIX = 'CJK UNIFIED IDEOGRAPH'
CJK_CMP_PREFIX = 'CJK COMPATIBILITY IDEOGRAPH'
sample_chars = [
'$', # DOLLAR SIGN
'A', # LATIN CAPITAL LETTER A
'a', # LATIN SMALL LETTER A
'\u20a0', # EURO-CURRENCY SIGN
'\u20ac', # EURO SIGN
]
CharDescription = namedtuple('CharDescription', 'code_str char name')
QueryResult = namedtuple('QueryResult', 'count items')
def tokenize(text):
"""return iterable of uppercased words"""
for match in RE_WORD.finditer(text):
yield match.group().upper()
def query_type(text):
text_upper = text.upper()
if 'U+' in text_upper:
return 'CODEPOINT'
elif RE_UNICODE_NAME.match(text_upper):
return 'NAME'
else:
return 'CHARACTERS'
class UnicodeNameIndex:
def __init__(self, chars=None):
self.load(chars)
def load(self, chars=None):
self.index = None
if chars is None:
try:
with open(INDEX_NAME, 'rb') as fp:
self.index = pickle.load(fp)
except OSError:
pass
if self.index is None:
self.build_index(chars)
if len(self.index) > MINIMUM_SAVE_LEN:
try:
self.save()
except OSError as exc:
warnings.warn('Could not save {!r}: {}'
.format(INDEX_NAME, exc))
def save(self):
with open(INDEX_NAME, 'wb') as fp:
pickle.dump(self.index, fp)
def build_index(self, chars=None):
if chars is None:
chars = (chr(i) for i in range(32, sys.maxunicode))
index = {}
for char in chars:
try:
name = unicodedata.name(char)
except ValueError:
continue
if name.startswith(CJK_UNI_PREFIX):
name = CJK_UNI_PREFIX
elif name.startswith(CJK_CMP_PREFIX):
name = CJK_CMP_PREFIX
for word in tokenize(name):
index.setdefault(word, set()).add(char)
self.index = index
def word_rank(self, top=None):
res = [(len(self.index[key]), key) for key in self.index]
res.sort(key=lambda item: (-item[0], item[1]))
if top is not None:
res = res[:top]
return res
def word_report(self, top=None):
for postings, key in self.word_rank(top):
print('{:5} {}'.format(postings, key))
def find_chars(self, query, start=0, stop=None):
stop = sys.maxsize if stop is None else stop
result_sets = []
for word in tokenize(query):
chars = self.index.get(word)
if chars is None: # shorcut: no such word
result_sets = []
break
result_sets.append(chars)
if not result_sets:
return QueryResult(0, ())
result = functools.reduce(set.intersection, result_sets)
result = sorted(result) # must sort to support start, stop
result_iter = itertools.islice(result, start, stop)
return QueryResult(len(result),
(char for char in result_iter))
def describe(self, char):
code_str = 'U+{:04X}'.format(ord(char))
name = unicodedata.name(char)
return CharDescription(code_str, char, name)
def find_descriptions(self, query, start=0, stop=None):
for char in self.find_chars(query, start, stop).items:
yield self.describe(char)
def get_descriptions(self, chars):
for char in chars:
yield self.describe(char)
def describe_str(self, char):
return '{:7}\t{}\t{}'.format(*self.describe(char))
def find_description_strs(self, query, start=0, stop=None):
for char in self.find_chars(query, start, stop).items:
yield self.describe_str(char)
@staticmethod # not an instance method due to concurrency
def status(query, counter):
if counter == 0:
msg = 'No match'
elif counter == 1:
msg = '1 match'
else:
msg = '{} matches'.format(counter)
return '{} for {!r}'.format(msg, query)
def main(*args):
index = UnicodeNameIndex()
query = ' '.join(args)
n = 0
for n, line in enumerate(index.find_description_strs(query), 1):
print(line)
print('({})'.format(index.status(query, n)))
if __name__ == '__main__':
if len(sys.argv) > 1:
main(*sys.argv[1:])
else:
print('Usage: {} word1 [word2]...'.format(sys.argv[0]))

View File

@ -0,0 +1,19 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Charfinder</title>
</head>
<body>
Examples: {links}
<p>
<form action="/">
<input type="search" name="query" value="{query}">
<input type="submit" value="find"> {message}
</form>
</p>
<table>
{result}
</table>
</body>
</html>

View File

@ -0,0 +1,72 @@
#!/usr/bin/env python3
import sys
import asyncio
from aiohttp import web
from charfinder import UnicodeNameIndex
TEMPLATE_NAME = 'http_charfinder.html'
CONTENT_TYPE = 'text/html; charset=UTF-8'
SAMPLE_WORDS = ('bismillah chess cat circled Malayalam digit'
' Roman face Ethiopic black mark symbol dot'
' operator Braille hexagram').split()
ROW_TPL = '<tr><td>{code_str}</td><th>{char}</th><td>{name}</td></tr>'
LINK_TPL = '<a href="/?query={0}" title="find &quot;{0}&quot;">{0}</a>'
LINKS_HTML = ', '.join(LINK_TPL.format(word) for word in
sorted(SAMPLE_WORDS, key=str.upper))
index = UnicodeNameIndex()
with open(TEMPLATE_NAME) as tpl:
template = tpl.read()
template = template.replace('{links}', LINKS_HTML)
# BEGIN HTTP_CHARFINDER_HOME
def home(request): # <1>
query = request.GET.get('query', '').strip() # <2>
print('Query: {!r}'.format(query)) # <3>
if query: # <4>
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**vars(descr))
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
descriptions = []
res = ''
msg = 'Enter words describing characters.'
html = template.format(query=query, result=res, # <5>
message=msg)
print('Sending {} results'.format(len(descriptions))) # <6>
return web.Response(content_type=CONTENT_TYPE, text=html) # <7>
# END HTTP_CHARFINDER_HOME
# BEGIN HTTP_CHARFINDER_SETUP
@asyncio.coroutine
def init(loop, address, port): # <1>
app = web.Application(loop=loop) # <2>
app.router.add_route('GET', '/', home) # <3>
handler = app.make_handler() # <4>
server = yield from loop.create_server(handler,
address, port) # <5>
return server.sockets[0].getsockname() # <6>
def main(address="127.0.0.1", port=8888):
port = int(port)
loop = asyncio.get_event_loop()
host = loop.run_until_complete(init(loop, address, port)) # <7>
print('Serving on {}. Hit CTRL-C to stop.'.format(host))
try:
loop.run_forever() # <8>
except KeyboardInterrupt: # CTRL+C pressed
pass
print('Server shutting down.')
loop.close() # <9>
if __name__ == '__main__':
main(*sys.argv[1:])
# END HTTP_CHARFINDER_SETUP

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python3
# BEGIN TCP_CHARFINDER_TOP
import sys
import asyncio
from charfinder import UnicodeNameIndex # <1>
CRLF = b'\r\n'
PROMPT = b'?> '
index = UnicodeNameIndex() # <2>
@asyncio.coroutine
def handle_queries(reader, writer): # <3>
while True: # <4>
writer.write(PROMPT) # can't yield from! # <5>
yield from writer.drain() # must yield from! # <6>
data = yield from reader.readline() # <7>
try:
query = data.decode().strip()
except UnicodeDecodeError: # <8>
query = '\x00'
client = writer.get_extra_info('peername') # <9>
print('Received from {}: {!r}'.format(client, query)) # <10>
if query:
if ord(query[:1]) < 32: # <11>
break
lines = list(index.find_description_strs(query)) # <12>
if lines:
writer.writelines(line.encode() + CRLF for line in lines) # <13>
writer.write(index.status(query, len(lines)).encode() + CRLF) # <14>
yield from writer.drain() # <15>
print('Sent {} results'.format(len(lines))) # <16>
print('Close the client socket') # <17>
writer.close() # <18>
# END TCP_CHARFINDER_TOP
# BEGIN TCP_CHARFINDER_MAIN
def main(address='127.0.0.1', port=2323): # <1>
port = int(port)
loop = asyncio.get_event_loop()
server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop) # <2>
server = loop.run_until_complete(server_coro) # <3>
host = server.sockets[0].getsockname() # <4>
print('Serving on {}. Hit CTRL-C to stop.'.format(host)) # <5>
try:
loop.run_forever() # <6>
except KeyboardInterrupt: # CTRL+C pressed
pass
print('Server shutting down.')
server.close() # <7>
loop.run_until_complete(server.wait_closed()) # <8>
loop.close() # <9>
if __name__ == '__main__':
main(*sys.argv[1:]) # <10>
# END TCP_CHARFINDER_MAIN

View File

@ -0,0 +1,115 @@
import pytest
from charfinder import UnicodeNameIndex, tokenize, sample_chars, query_type
from unicodedata import name
@pytest.fixture
def sample_index():
return UnicodeNameIndex(sample_chars)
@pytest.fixture(scope="module")
def full_index():
return UnicodeNameIndex()
def test_query_type():
assert query_type('blue') == 'NAME'
def test_tokenize():
assert list(tokenize('')) == []
assert list(tokenize('a b')) == ['A', 'B']
assert list(tokenize('a-b')) == ['A', 'B']
assert list(tokenize('abc')) == ['ABC']
assert list(tokenize('café')) == ['CAFÉ']
def test_index():
sample_index = UnicodeNameIndex(sample_chars)
assert len(sample_index.index) == 9
def test_find_word_no_match(sample_index):
res = sample_index.find_chars('qwertyuiop')
assert len(res.items) == 0
def test_find_word_1_match(sample_index):
res = [(ord(char), name(char))
for char in sample_index.find_chars('currency').items]
assert res == [(8352, 'EURO-CURRENCY SIGN')]
def test_find_word_1_match_character_result(sample_index):
res = [name(char) for char in
sample_index.find_chars('currency').items]
assert res == ['EURO-CURRENCY SIGN']
def test_find_word_2_matches(sample_index):
res = [(ord(char), name(char))
for char in sample_index.find_chars('Euro').items]
assert res == [(8352, 'EURO-CURRENCY SIGN'),
(8364, 'EURO SIGN')]
def test_find_2_words_no_matches(sample_index):
res = sample_index.find_chars('Euro letter')
assert res.count == 0
def test_find_2_words_no_matches_because_one_not_found(sample_index):
res = sample_index.find_chars('letter qwertyuiop')
assert res.count == 0
def test_find_2_words_1_match(sample_index):
res = sample_index.find_chars('sign dollar')
assert res.count == 1
def test_find_2_words_2_matches(sample_index):
res = sample_index.find_chars('latin letter')
assert res.count == 2
def test_find_chars_many_matches_full(full_index):
res = full_index.find_chars('letter')
assert res.count > 7000
def test_find_1_word_1_match_full(full_index):
res = [(ord(char), name(char))
for char in full_index.find_chars('registered').items]
assert res == [(174, 'REGISTERED SIGN')]
def test_find_1_word_2_matches_full(full_index):
res = full_index.find_chars('rook')
assert res.count == 2
def test_find_3_words_no_matches_full(full_index):
res = full_index.find_chars('no such character')
assert res.count == 0
def test_find_with_start(sample_index):
res = [(ord(char), name(char))
for char in sample_index.find_chars('sign', 1).items]
assert res == [(8352, 'EURO-CURRENCY SIGN'), (8364, 'EURO SIGN')]
def test_find_with_stop(sample_index):
res = [(ord(char), name(char))
for char in sample_index.find_chars('sign', 0, 2).items]
assert res == [(36, 'DOLLAR SIGN'), (8352, 'EURO-CURRENCY SIGN')]
def test_find_with_start_stop(sample_index):
res = [(ord(char), name(char))
for char in sample_index.find_chars('sign', 1, 2).items]
assert res == [(8352, 'EURO-CURRENCY SIGN')]

View File

@ -0,0 +1,3 @@
The ``asyncio`` flag download examples are in the
``../../17-futures/countries/`` directory together
with the sequential and threadpool examples.

View File

@ -0,0 +1,53 @@
# spinner_asyncio.py
# credits: Example by Luciano Ramalho inspired by
# Michele Simionato's multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/538048.html
# BEGIN SPINNER_ASYNCIO
import asyncio
import itertools
import sys
@asyncio.coroutine # <1>
def spin(msg): # <2>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1) # <3>
except asyncio.CancelledError: # <4>
break
write(' ' * len(status) + '\x08' * len(status))
@asyncio.coroutine
def slow_function(): # <5>
# pretend waiting a long time for I/O
yield from asyncio.sleep(3) # <6>
return 42
@asyncio.coroutine
def supervisor(): # <7>
spinner = asyncio.async(spin('thinking!')) # <8>
print('spinner object:', spinner) # <9>
result = yield from slow_function() # <10>
spinner.cancel() # <11>
return result
def main():
loop = asyncio.get_event_loop() # <12>
result = loop.run_until_complete(supervisor()) # <13>
loop.close()
print('Answer:', result)
if __name__ == '__main__':
main()
# END SPINNER_ASYNCIO

View File

@ -0,0 +1,53 @@
# spinner_asyncio.py
# credits: Example by Luciano Ramalho inspired by
# Michele Simionato's multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/538048.html
# BEGIN SPINNER_ASYNCIO
import asyncio
import itertools
import sys
@asyncio.coroutine # <1>
def spin(msg): # <2>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1) # <3>
except asyncio.CancelledError: # <4>
break
write(' ' * len(status) + '\x08' * len(status))
@asyncio.coroutine
def slow_function(): # <5>
# pretend waiting a long time for I/O
yield from asyncio.sleep(3) # <6>
return 42
@asyncio.coroutine
def supervisor(): # <7>
spinner = asyncio.async(spin('thinking!')) # <8>
print('spinner object:', spinner) # <9>
result = yield from slow_function() # <10>
spinner.cancel() # <11>
return result
def main():
loop = asyncio.get_event_loop() # <12>
result = loop.run_until_complete(supervisor()) # <13>
loop.close()
print('Answer:', result)
if __name__ == '__main__':
main()
# END SPINNER_ASYNCIO

View File

@ -0,0 +1,56 @@
# spinner_thread.py
# credits: Adapted from Michele Simionato's
# multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/538048.html
# BEGIN SPINNER_THREAD
import threading
import itertools
import time
import sys
class Signal: # <1>
go = True
def spin(msg, signal): # <2>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'): # <3>
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status)) # <4>
time.sleep(.1)
if not signal.go: # <5>
break
write(' ' * len(status) + '\x08' * len(status)) # <6>
def slow_function(): # <7>
# pretend waiting a long time for I/O
time.sleep(3) # <8>
return 42
def supervisor(): # <9>
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner) # <10>
spinner.start() # <11>
result = slow_function() # <12>
signal.go = False # <13>
spinner.join() # <14>
return result
def main():
result = supervisor() # <15>
print('Answer:', result)
if __name__ == '__main__':
main()
# END SPINNER_THREAD