dynamic attributes, descriptors and first concurrency examples

This commit is contained in:
Luciano Ramalho
2015-01-17 22:40:40 -02:00
parent 0618105a47
commit dd1a53ff71
27 changed files with 1151 additions and 216 deletions

201
concurrency/charfinder.py Executable file
View File

@@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""
Unicode character finder utility:
find characters based on words in their official names.
This can be used from the command line, just pass words as arguments.
Here is the ``main`` function which makes it happen::
>>> main('rook') # doctest: +NORMALIZE_WHITESPACE
U+2656 ♖ WHITE CHESS ROOK
U+265C ♜ BLACK CHESS ROOK
(2 matches for 'rook')
>>> main('rook', 'black') # doctest: +NORMALIZE_WHITESPACE
U+265C ♜ BLACK CHESS ROOK
(1 match for 'rook black')
>>> main('white bishop') # doctest: +NORMALIZE_WHITESPACE
U+2657 ♗ WHITE CHESS BISHOP
(1 match for 'white bishop')
>>> main("jabberwocky's vest")
(No match for "jabberwocky's vest")
For exploring words that occur in the character names, there is the
``word_report`` function::
>>> index = UnicodeNameIndex(sample_chars)
>>> index.word_report()
3 SIGN
2 A
2 EURO
2 LATIN
2 LETTER
1 CAPITAL
1 CURRENCY
1 DOLLAR
1 SMALL
>>> index = UnicodeNameIndex()
>>> index.word_report(7)
13196 SYLLABLE
11735 HANGUL
7616 LETTER
2232 WITH
2180 SIGN
2122 SMALL
1709 CAPITAL
Note: character names starting with the string ``'CJK UNIFIED IDEOGRAPH'``
are not indexed. Those names are not useful for searching, since the only
unique part of the name is the codepoint in hexadecimal.
"""
import sys
import re
import unicodedata
import pickle
import warnings
RE_WORD = re.compile('\w+')
INDEX_NAME = 'charfinder_index.pickle'
MINIMUM_SAVE_LEN = 10000
CJK_PREFIX = 'CJK UNIFIED IDEOGRAPH'
sample_chars = [
'$', # DOLLAR SIGN
'A', # LATIN CAPITAL LETTER A
'a', # LATIN SMALL LETTER A
'\u20a0', # EURO-CURRENCY SIGN
'\u20ac', # EURO SIGN
]
def tokenize(text):
"""return iterable of uppercased words"""
for match in RE_WORD.finditer(text):
yield match.group().upper()
class UnicodeNameIndex:
def __init__(self, chars=None):
self.load(chars)
def load(self, chars=None):
self.index = None
if chars is None:
try:
with open(INDEX_NAME, 'rb') as fp:
self.index = pickle.load(fp)
except OSError:
pass
if self.index is None:
self.build_index(chars)
if len(self.index) > MINIMUM_SAVE_LEN:
try:
self.save()
except OSError as exc:
warnings.warn('Could not save {!r}: {}'
.format(INDEX_NAME, exc))
def save(self):
with open(INDEX_NAME, 'wb') as fp:
pickle.dump(self.index, fp)
def build_index(self, chars=None):
if chars is None:
chars = (chr(i) for i in range(32, sys.maxunicode))
index = {}
for char in chars:
try:
name = unicodedata.name(char)
except ValueError:
continue
if name.startswith(CJK_PREFIX):
name = CJK_PREFIX
code = ord(char)
for word in tokenize(name):
index.setdefault(word, set()).add(code)
self.index = index
def __len__(self):
return len(self.index)
def word_rank(self, top=None):
res = [(len(self.index[key]), key) for key in self.index]
res.sort(key=lambda item: (-item[0], item[1]))
if top is not None:
res = res[:top]
return res
def word_report(self, top=None):
"""
Generate report with most frequent words
>>> index = UnicodeNameIndex()
>>> index.word_report(7)
13196 SYLLABLE
11735 HANGUL
7616 LETTER
2232 WITH
2180 SIGN
2122 SMALL
1709 CAPITAL
"""
for postings, key in self.word_rank(top):
print('{:5} {}'.format(postings, key))
def find_codes(self, query):
result_sets = []
for word in tokenize(query):
if word in self.index:
result_sets.append(self.index[word])
else: # shorcut: no such word
result_sets = []
break
if result_sets:
result = result_sets[0]
result.intersection_update(*result_sets[1:])
else:
result = set()
if len(result) > 0:
for code in sorted(result):
yield code
def describe(self, code):
code_str = 'U+{:04X}'.format(code)
char = chr(code)
name = unicodedata.name(char)
return '{:7}\t{}\t{}'.format(code_str, char, name)
def find_descriptions(self, query):
for code in self.find_codes(query):
yield self.describe(code)
def main(*args):
index = UnicodeNameIndex()
query = ' '.join(args)
counter = 0
for line in index.find_descriptions(query):
print(line)
counter += 1
if counter == 0:
msg = 'No match'
elif counter == 1:
msg = '1 match'
else:
msg = '{} matches'.format(counter)
print('({} for {!r})'.format(msg, query))
if __name__ == '__main__':
if len(sys.argv) > 1:
main(*sys.argv[1:])
else:
print('Usage: {} word1 [word2]...'.format(sys.argv[0]))

View File

@@ -0,0 +1,69 @@
import asyncio
from aiohttp import web
from charfinder import UnicodeNameIndex
TEMPLATE = '''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>title</title>
</head>
<body>
<form action="/">
<input type="search" name="query" value="{query}">
<input type="submit" value="find">
</form>
<p>{message}</p>
<hr>
<pre>
{result}
</pre>
</body>
</html>
'''
CONTENT_TYPE = 'text/html; charset=UTF-8'
index = None # a UnicodeNameIndex instance
@asyncio.coroutine
def handle(request):
query = request.GET.get('query', '')
print('Query: {!r}'.format(query))
if query:
lines = list(index.find_descriptions(query))
res = '\n'.join(lines)
plural = 'es' if len(lines) > 1 else ''
msg = '{} match{} for {!r}'.format(len(lines), plural, query)
else:
lines = []
res = ''
msg = 'Type words describing characters, e.g. chess.'
text = TEMPLATE.format(query=query, result=res, message=msg)
return web.Response(content_type=CONTENT_TYPE, text=text)
@asyncio.coroutine
def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', handle)
server = yield from loop.create_server(app.make_handler(),
'127.0.0.1', 8080)
host = server.sockets[0].getsockname()
print('Serving on {}. Hit CTRL-C to stop.'.format(host))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
if __name__ == '__main__':
index = UnicodeNameIndex()
main()

70
concurrency/tcp_charserver.py Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
import asyncio
from charfinder import UnicodeNameIndex
CRLF = b'\r\n'
PROMPT = b'?> '
index = None # a UnicodeNameIndex instance
def writeln(writer, arg):
if isinstance(arg, str):
lines = [arg]
else:
lines = arg
writer.writelines(line.encode() + CRLF for line in lines)
@asyncio.coroutine
def handle_queries(reader, writer):
while True:
writer.write(PROMPT)
yield from writer.drain()
data = yield from reader.readline()
try:
query = data.decode().strip()
except UnicodeDecodeError:
query = '\x00'
if ord(query[:1]) < 32:
break
client = writer.get_extra_info('peername')
print('Received from {}: {}'.format(client, query))
lines = list(index.find_descriptions(query))
if lines:
writeln(writer, lines)
plural = 'es' if len(lines) > 1 else ''
msg = '({} match{} for {!r})'.format(len(lines), plural, query)
writeln(writer, msg)
print('Sent: {} lines + total'.format(len(lines)))
else:
writeln(writer, '(No match for {!r})'.format(query))
print('Sent: 1 line, no match')
yield from writer.drain()
print('Close the client socket')
writer.close()
def main():
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_queries, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
host = server.sockets[0].getsockname()
print('Serving on {}. Hit CTRL-C to stop.'.format(host))
try:
loop.run_forever()
except KeyboardInterrupt: # CTRL+C pressed
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
if __name__ == '__main__':
index = UnicodeNameIndex()
main()

View File

@@ -0,0 +1,86 @@
import pytest
from charfinder import UnicodeNameIndex, tokenize, sample_chars
from unicodedata import name
@pytest.fixture
def sample_index():
return UnicodeNameIndex(sample_chars)
@pytest.fixture(scope="module")
def full_index():
return UnicodeNameIndex()
def test_tokenize():
assert list(tokenize('')) == []
assert list(tokenize('a b')) == ['A', 'B']
assert list(tokenize('a-b')) == ['A', 'B']
assert list(tokenize('abc')) == ['ABC']
assert list(tokenize('café')) == ['CAFÉ']
def test_index():
sample_index = UnicodeNameIndex(sample_chars)
assert len(sample_index) == 9
def test_find_word_no_match(sample_index):
res = list(sample_index.find_codes('qwertyuiop'))
assert len(res) == 0
def test_find_word_1_match(sample_index):
res = [(code, name(chr(code)))
for code in sample_index.find_codes('currency')]
assert res == [(8352, 'EURO-CURRENCY SIGN')]
def test_find_word_2_matches(sample_index):
res = [(code, name(chr(code)))
for code in sample_index.find_codes('Euro')]
assert res == [(8352, 'EURO-CURRENCY SIGN'),
(8364, 'EURO SIGN')]
def test_find_2_words_no_matches(sample_index):
res = list(sample_index.find_codes('Euro letter'))
assert len(res) == 0
def test_find_2_words_no_matches_because_one_not_found(sample_index):
res = list(sample_index.find_codes('letter qwertyuiop'))
assert len(res) == 0
def test_find_2_words_1_match(sample_index):
res = list(sample_index.find_codes('sign dollar'))
assert len(res) == 1
def test_find_2_words_2_matches(sample_index):
res = list(sample_index.find_codes('latin letter'))
assert len(res) == 2
def test_find_codes_many_matches_full(full_index):
res = list(full_index.find_codes('letter'))
assert len(res) > 7000
def test_find_1_word_1_match_full(full_index):
res = [(code, name(chr(code)))
for code in full_index.find_codes('registered')]
assert res == [(174, 'REGISTERED SIGN')]
def test_find_1_word_2_matches_full(full_index):
res = list(full_index.find_codes('rook'))
assert len(res) == 2
def test_find_3_words_no_matches_full(full_index):
res = list(full_index.find_codes('no such character'))
assert len(res) == 0