ch22 example files

This commit is contained in:
Luciano Ramalho 2021-02-24 18:55:55 -03:00
parent 47cafc801a
commit 1702717182
7 changed files with 4096 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,89 @@
#!/usr/bin/env python
"""
Class ``InvertedIndex`` builds an inverted index mapping each word to
the set of Unicode characters which contain that word in their names.
Optional arguments to the constructor are ``first`` and ``last+1`` character
codes to index, to make testing easier.
In the example below, only the ASCII range was indexed::
>>> idx = InvertedIndex(32, 128)
>>> sorted(idx.entries['SIGN'])
['#', '$', '%', '+', '<', '=', '>']
>>> sorted(idx.entries['DIGIT'])
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>>> idx.entries['DIGIT'] & idx.entries['EIGHT']
{'8'}
>>> idx.search('digit')
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>>> idx.search('eight digit')
['8']
>>> idx.search('a letter')
['A', 'a']
>>> idx.search('a letter capital')
['A']
>>> idx.search('borogove')
[]
"""
import sys
import unicodedata
from collections import defaultdict
from collections.abc import Iterator
STOP_CODE: int = sys.maxunicode + 1
Char = str
Index = defaultdict[str, set[Char]]
def tokenize(text: str) -> Iterator[str]:
"""return iterator of uppercased words"""
for word in text.upper().replace('-', ' ').split():
yield word
class InvertedIndex:
entries: Index
def __init__(self, start: int = 32, stop: int = STOP_CODE):
entries: Index = defaultdict(set)
for char in (chr(i) for i in range(start, stop)):
name = unicodedata.name(char, '')
if name:
for word in tokenize(name):
entries[word].add(char)
self.entries = entries
def search(self, query: str) -> list[Char]:
if words := list(tokenize(query)):
first = self.entries[words[0]]
result = first.intersection(*(self.entries[w] for w in words[1:]))
return sorted(result)
else:
return []
def format_results(chars: list[Char]) -> Iterator[str]:
for char in chars:
name = unicodedata.name(char)
code = ord(char)
yield f'U+{code:04X}\t{char}\t{name}'
def main(words: list[str]) -> None:
if not words:
print('Please give one or more words to search.')
sys.exit()
index = InvertedIndex()
chars = index.search(' '.join(words))
for line in format_results(chars):
print(line)
print('' * 66, f'{len(chars)} found')
if __name__ == '__main__':
main(sys.argv[1:])

View File

@ -0,0 +1,6 @@
click==7.1.2
fastapi==0.63.0
h11==0.12.0
pydantic==1.7.3
starlette==0.13.6
uvicorn==0.13.4

View File

@ -0,0 +1,83 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Mojifinder</title>
<style>
body {font-family: "Lucida Sans Unicode", "Lucida Grande", sans-serif;}
table {font-family: "Lucida Console", "Monaco", monospace;
text-align: left; min-width: 300px}
td.code {min-width: 40px; text-align: right;}
td.char {min-width: 50px; text-align: center;}
caption {background: lightgray; }
</style>
<script>
"use strict";
function appendCell(row, text, class_) {
let cell = document.createElement('td');
cell.appendChild(document.createTextNode(text));
if (class_ !== undefined) {
cell.setAttribute('class', class_);
}
row.appendChild(cell);
}
function fillTable(results) {
const table = document.querySelector('table');
while (table.lastElementChild.tagName === 'TR') {
table.removeChild(table.lastElementChild);
}
let count = 0;
results.forEach((item) => {
let row = document.createElement('tr');
let code = item.char.codePointAt(0);
let uCode = 'U+' + code.toString(16).toUpperCase().padStart(4, '0');
appendCell(row, uCode, 'code');
appendCell(row, item.char, 'char');
appendCell(row, item.name);
table.appendChild(row);
count++;
});
let plural = "s";
if (count===1) plural = "";
let msg = `${count} character${plural} found`;
document.querySelector('caption').textContent = msg;
}
async function fetchResults(query) {
let url = location.href.replace(location.search, '');
const response = await fetch(`${url}search?q=${query}`);
if (response.ok) {
return response.json();
} else {
throw new Error(`HTTP error! status: ${response.status}`);
}
}
function updateTable(event) {
const input = document.getElementById('query');
fetchResults(input.value)
.then(fillTable)
.catch(error => console.log(error));
}
window.addEventListener('DOMContentLoaded', (event) => {
const input = document.getElementById('query');
input.addEventListener('change', updateTable);
});
</script>
</head>
<body>
<div>
<input id="query" type="search" name="q" value="">
<button onClick="updateTable()">Search</button>
</div>
<table>
<caption></caption>
</table>
</body>
</html>

View File

@ -0,0 +1,72 @@
#!/usr/bin/env python3
# tag::TCP_MOJIFINDER_TOP[]
import sys
import asyncio
import functools
from charindex import InvertedIndex, format_results # <1>
CRLF = b'\r\n'
PROMPT = b'?> '
async def finder(index: InvertedIndex, # <2>
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
client = writer.get_extra_info('peername') # <3>
while True: # <4>
writer.write(PROMPT) # can't await! # <5>
await writer.drain() # must await! # <6>
data = await reader.readline() # <7>
try:
query = data.decode().strip() # <8>
except UnicodeDecodeError: # <9>
query = '\x00'
print(f' From {client}: {query!r}') # <10>
if query:
if ord(query[:1]) < 32: # <11>
break
results = await search(query, index, writer) # <12>
print(f' To {client}: {results} results.') # <13>
writer.close() # <14>
await writer.wait_closed() # <15>
print(f'Close {client}.') # <16>
# end::TCP_MOJIFINDER_TOP[]
# tag::TCP_MOJIFINDER_SEARCH[]
async def search(query: str, # <1>
index: InvertedIndex,
writer: asyncio.StreamWriter) -> int:
chars = index.search(query) # <2>
lines = (line.encode() + CRLF for line # <3>
in format_results(chars))
writer.writelines(lines) # <4>
await writer.drain() # <5>
status_line = f'{"" * 66} {len(chars)} found' # <6>
writer.write(status_line.encode() + CRLF)
await writer.drain()
return len(chars)
# end::TCP_MOJIFINDER_SEARCH[]
# tag::TCP_MOJIFINDER_MAIN[]
async def supervisor(index: InvertedIndex, host: str, port: int):
server = await asyncio.start_server( # <1>
functools.partial(finder, index), # <2>
host, port) # <3>
addr = server.sockets[0].getsockname() # type: ignore # <4>
print(f'Serving on {addr}. Hit CTRL-C to stop.')
await server.serve_forever() # <5>
def main(host: str = '127.0.0.1', port_arg: str = '2323'):
port = int(port_arg)
print('Building index.')
index = InvertedIndex() # <6>
try:
asyncio.run(supervisor(index, host, port)) # <7>
except KeyboardInterrupt: # <8>
print('\nServer shut down.')
if __name__ == '__main__':
main(*sys.argv[1:])
# end::TCP_MOJIFINDER_MAIN[]

View File

@ -0,0 +1,38 @@
"""
uvicorn main:app --reload
"""
import pathlib
from unicodedata import name
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from charindex import InvertedIndex
app = FastAPI(
title='Mojifinder Web',
description='Search for Unicode characters by name.',
)
class CharName(BaseModel):
char: str
name: str
def init(app):
app.state.index = InvertedIndex()
static = pathlib.Path(__file__).parent.absolute() / 'static'
with open(static / 'form.html') as fp:
app.state.form = fp.read()
init(app)
@app.get('/', response_class=HTMLResponse, include_in_schema=False)
def form():
return app.state.form
@app.get('/search', response_model=list[CharName])
async def search(q: str):
chars = app.state.index.search(q)
return [{'char': c, 'name': name(c)} for c in chars]

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
import json
import unicodedata
from bottle import route, request, run, static_file
from charindex import InvertedIndex
index = {}
@route('/')
def form():
return static_file('form.html', root = 'static/')
@route('/search')
def search():
query = request.query['q']
chars = index.search(query)
results = []
for char in chars:
name = unicodedata.name(char)
results.append({'char': char, 'name': name})
return json.dumps(results).encode('UTF-8')
def main(port):
global index
index = InvertedIndex()
host = 'localhost'
run(host='localhost', port=port, debug=True)
if __name__ == '__main__':
main(8000)