renumbering chapters >= 19

This commit is contained in:
Luciano Ramalho
2021-09-10 12:34:39 -03:00
parent cbd13885fc
commit 4ae4096c4c
154 changed files with 7 additions and 1134 deletions

4
21-async/README.rst Normal file
View File

@@ -0,0 +1,4 @@
Sample code for Chapter 22 - "Asynchronous programming"
From the book "Fluent Python, Second Edition" by Luciano Ramalho (O'Reilly, 2021)
https://learning.oreilly.com/library/view/fluent-python-2nd/9781492056348/

View File

@@ -0,0 +1,28 @@
domainlib demonstration
=======================
Run Python's async console (requires Python ≥ 3.8)::
$ python3 -m asyncio
I'll see ``asyncio`` imported automatically::
>>> import asyncio
Now you can experiment with ``domainlib``.
At the `>>>` prompt, type these commands::
>>> from domainlib import *
>>> await probe('python.org')
Note the result.
Next::
>>> names = 'python.org rust-lang.org golang.org n05uch1an9.org'.split()
>>> async for result in multi_probe(names):
... print(*result, sep='\t')
Note that if you run the last two lines again,
the results are likely to appear in a different order.

View File

@@ -0,0 +1,29 @@
#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4 # <1>
async def probe(domain: str) -> tuple[str, bool]: # <2>
loop = asyncio.get_running_loop() # <3>
try:
await loop.getaddrinfo(domain, None) # <4>
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None: # <5>
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # <6>
domains = (f'{name}.dev'.lower() for name in names) # <7>
coros = [probe(domain) for domain in domains] # <8>
for coro in asyncio.as_completed(coros): # <9>
domain, found = await coro # <10>
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
asyncio.run(main()) # <11>

View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
import asyncio
import sys
from keyword import kwlist
from domainlib import multi_probe
async def main(tld: str) -> None:
tld = tld.strip('.')
names = (kw for kw in kwlist if len(kw) <= 4) # <1>
domains = (f'{name}.{tld}'.lower() for name in names) # <2>
print('FOUND\t\tNOT FOUND') # <3>
print('=====\t\t=========')
async for domain, found in multi_probe(domains): # <4>
indent = '' if found else '\t\t' # <5>
print(f'{indent}{domain}')
if __name__ == '__main__':
if len(sys.argv) == 2:
asyncio.run(main(sys.argv[1])) # <6>
else:
print('Please provide a TLD.', f'Example: {sys.argv[0]} COM.BR')

View File

@@ -0,0 +1,30 @@
import asyncio
import socket
from collections.abc import Iterable, AsyncIterator
from typing import NamedTuple, Optional
class Result(NamedTuple): # <1>
domain: str
found: bool
OptionalLoop = Optional[asyncio.AbstractEventLoop] # <2>
async def probe(domain: str, loop: OptionalLoop = None) -> Result: # <3>
if loop is None:
loop = asyncio.get_running_loop()
try:
await loop.getaddrinfo(domain, None)
except socket.gaierror:
return Result(domain, False)
return Result(domain, True)
async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]: # <4>
loop = asyncio.get_running_loop()
coros = [probe(domain, loop) for domain in domains] # <5>
for coro in asyncio.as_completed(coros): # <6>
result = await coro # <7>
yield result # <8>

View File

@@ -0,0 +1,28 @@
#!/usr/bin/env python3
from curio import run, TaskGroup
import curio.socket as socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4
async def probe(domain: str) -> tuple[str, bool]: # <1>
try:
await socket.getaddrinfo(domain, None) # <2>
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None:
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
domains = (f'{name}.dev'.lower() for name in names)
async with TaskGroup() as group: # <3>
for domain in domains:
await group.spawn(probe, domain) # <4>
async for task in group: # <5>
domain, found = task.result
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
run(main()) # <6>

View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
import curio
import sys
from keyword import kwlist
from domainlib import multi_probe
async def main(tld: str) -> None:
tld = tld.strip('.')
names = (kw for kw in kwlist if len(kw) <= 4)
domains = (f'{name}.{tld}'.lower() for name in names)
print('FOUND\t\tNOT FOUND')
print('=====\t\t=========')
async for domain, found in multi_probe(domains):
indent = '' if found else '\t\t'
print(f'{indent}{domain}')
if __name__ == '__main__':
if len(sys.argv) == 2:
curio.run(main(sys.argv[1]))
else:
print('Please provide a TLD.', f'Example: {sys.argv[0]} COM.BR')

View File

@@ -0,0 +1,26 @@
from collections.abc import Iterable, AsyncIterator
from typing import NamedTuple
from curio import TaskGroup
import curio.socket as socket
class Result(NamedTuple):
domain: str
found: bool
async def probe(domain: str) -> Result:
try:
await socket.getaddrinfo(domain, None)
except socket.gaierror:
return Result(domain, False)
return Result(domain, True)
async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]:
async with TaskGroup() as group:
for domain in domains:
await group.spawn(probe, domain)
async for task in group:
yield task.result

View File

@@ -0,0 +1 @@
curio==1.5

View File

@@ -0,0 +1,41 @@
# Mojifinder: Unicode character search examples
Examples from _Fluent Python, Second Edition_—Chapter 22, _Asynchronous Programming_.
## How to run `web_mojifinder.py`
`web_mojifinder.py` is a Web application built with _[FastAPI](https://fastapi.tiangolo.com/)_.
To run it, first install _FastAPI_ and an ASGI server.
The application was tested with _[Uvicorn](https://www.uvicorn.org/)_.
```
$ pip install fastapi uvicorn
```
Now you can use `uvicorn` to run the app.
```
$ uvicorn web_mojifinder:app
```
Finally, visit http://127.0.0.1:8000/ with your browser to see the search form.
## Directory contents
These files can be run as scripts directly from the command line:
- `charindex.py`: libray used by the Mojifinder examples. Also works as CLI search script.
- `tcp_mojifinder.py`: TCP/IP Unicode search server. Depends only on the Python 3.9 standard library. Use a telnet application as client.
- `web_mojifinder_bottle.py`: Unicode Web service. Depends on `bottle.py` and `static/form.html`. Use an HTTP browser as client.
This program requires an ASGI server to run it:
- `web_mojifinder.py`: Unicode Web service. Depends on _[FastAPI](https://fastapi.tiangolo.com/)_ and `static/form.html`.
Support files:
- `bottle.py`: local copy of the single-file _[Bottle](https://bottlepy.org/)_ Web framework.
- `requirements.txt`: list of dependencies for `web_mojifinder.py`.
- `static/form.html`: HTML form used by the `web_*` examples.
- `README.md`: this file 🤓

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python
"""
Class ``InvertedIndex`` builds an inverted index mapping each word to
the set of Unicode characters which contain that word in their names.
Optional arguments to the constructor are ``first`` and ``last+1``
character codes to index, to make testing easier. In the examples
below, only the ASCII range was indexed.
The `entries` attribute is a `defaultdict` with uppercased single
words as keys::
>>> idx = InvertedIndex(32, 128)
>>> idx.entries['DOLLAR']
{'$'}
>>> sorted(idx.entries['SIGN'])
['#', '$', '%', '+', '<', '=', '>']
>>> idx.entries['A'] & idx.entries['SMALL']
{'a'}
>>> idx.entries['BRILLIG']
set()
The `.search()` method takes a string, uppercases it, splits it into
words, and returns the intersection of the entries for each word::
>>> idx.search('capital a')
{'A'}
"""
import sys
import unicodedata
from collections import defaultdict
from collections.abc import Iterator
STOP_CODE: int = sys.maxunicode + 1
Char = str
Index = defaultdict[str, set[Char]]
def tokenize(text: str) -> Iterator[str]:
"""return iterator of uppercased words"""
for word in text.upper().replace('-', ' ').split():
yield word
class InvertedIndex:
entries: Index
def __init__(self, start: int = 32, stop: int = STOP_CODE):
entries: Index = defaultdict(set)
for char in (chr(i) for i in range(start, stop)):
name = unicodedata.name(char, '')
if name:
for word in tokenize(name):
entries[word].add(char)
self.entries = entries
def search(self, query: str) -> set[Char]:
if words := list(tokenize(query)):
found = self.entries[words[0]]
return found.intersection(*(self.entries[w] for w in words[1:]))
else:
return set()
def format_results(chars: set[Char]) -> Iterator[str]:
for char in sorted(chars):
name = unicodedata.name(char)
code = ord(char)
yield f'U+{code:04X}\t{char}\t{name}'
def main(words: list[str]) -> None:
if not words:
print('Please give one or more words to search.')
sys.exit(2) # command line usage error
index = InvertedIndex()
chars = index.search(' '.join(words))
for line in format_results(chars):
print(line)
print('' * 66, f'{len(chars)} found')
if __name__ == '__main__':
main(sys.argv[1:])

View File

@@ -0,0 +1,7 @@
click==7.1.2
fastapi==0.65.2
h11==0.12.0
pydantic==1.8.2
starlette==0.13.6
typing-extensions==3.7.4.3
uvicorn==0.13.4

View File

@@ -0,0 +1,81 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Mojifinder</title>
<style>
body {font-family: "Lucida Sans Unicode", "Lucida Grande", sans-serif;}
table {font-family: "Lucida Console", "Monaco", monospace;
text-align: left; min-width: 300px}
td.code {min-width: 40px; text-align: right;}
td.char {min-width: 50px; text-align: center;}
caption {background: lightgray; }
</style>
<script>
"use strict";
function appendCell(row, text, class_) {
let cell = document.createElement('td');
cell.appendChild(document.createTextNode(text));
if (class_ !== undefined) {
cell.setAttribute('class', class_);
}
row.appendChild(cell);
}
function fillTable(results) {
const table = document.querySelector('table');
while (table.lastElementChild.tagName === 'TR') {
table.removeChild(table.lastElementChild);
}
let count = 0;
results.forEach((item) => {
let row = document.createElement('tr');
let code = item.char.codePointAt(0);
let uCode = 'U+' + code.toString(16).toUpperCase().padStart(4, '0');
appendCell(row, uCode, 'code');
appendCell(row, item.char, 'char');
appendCell(row, item.name);
table.appendChild(row);
count++;
});
let plural = "s";
if (count===1) plural = "";
let msg = `${count} character${plural} found`;
document.querySelector('caption').textContent = msg;
}
async function fetchResults(query) {
let url = location.href.replace(location.search, '');
const response = await fetch(`${url}search?q=${query}`);
if (response.ok) {
return response.json();
} else {
throw new Error(`HTTP error! status: ${response.status}`);
}
}
function updateTable(event) {
const input = document.getElementById('query');
fetchResults(input.value)
.then(fillTable)
.catch(error => console.log(error));
}
window.addEventListener('DOMContentLoaded', (event) => {
const input = document.getElementById('query');
input.addEventListener('change', updateTable);
});
</script>
</head>
<body>
<div>
<input id="query" type="search" name="q" value="">
<button onClick="updateTable()">Search</button>
</div>
<table>
<caption></caption>
</table>
</body>
</html>

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
# tag::TCP_MOJIFINDER_TOP[]
import asyncio
import functools
import sys
from asyncio.trsock import TransportSocket
from typing import cast
from charindex import InvertedIndex, format_results # <1>
CRLF = b'\r\n'
PROMPT = b'?> '
async def finder(index: InvertedIndex, # <2>
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
client = writer.get_extra_info('peername') # <3>
while True: # <4>
writer.write(PROMPT) # can't await! # <5>
await writer.drain() # must await! # <6>
data = await reader.readline() # <7>
try:
query = data.decode().strip() # <8>
except UnicodeDecodeError: # <9>
query = '\x00'
print(f' From {client}: {query!r}') # <10>
if query:
if ord(query[:1]) < 32: # <11>
break
results = await search(query, index, writer) # <12>
print(f' To {client}: {results} results.') # <13>
writer.close() # <14>
await writer.wait_closed() # <15>
print(f'Close {client}.') # <16>
# end::TCP_MOJIFINDER_TOP[]
# tag::TCP_MOJIFINDER_SEARCH[]
async def search(query: str, # <1>
index: InvertedIndex,
writer: asyncio.StreamWriter) -> int:
chars = index.search(query) # <2>
lines = (line.encode() + CRLF for line # <3>
in format_results(chars))
writer.writelines(lines) # <4>
await writer.drain() # <5>
status_line = f'{"" * 66} {len(chars)} found' # <6>
writer.write(status_line.encode() + CRLF)
await writer.drain()
return len(chars)
# end::TCP_MOJIFINDER_SEARCH[]
# tag::TCP_MOJIFINDER_MAIN[]
async def supervisor(index: InvertedIndex, host: str, port: int):
server = await asyncio.start_server( # <1>
functools.partial(finder, index), # <2>
host, port) # <3>
socket_list = cast(tuple[TransportSocket, ...], server.sockets) # <4>
addr = socket_list[0].getsockname()
print(f'Serving on {addr}. Hit CTRL-C to stop.') # <5>
await server.serve_forever() # <6>
def main(host: str = '127.0.0.1', port_arg: str = '2323'):
port = int(port_arg)
print('Building index.')
index = InvertedIndex() # <7>
try:
asyncio.run(supervisor(index, host, port)) # <8>
except KeyboardInterrupt: # <9>
print('\nServer shut down.')
if __name__ == '__main__':
main(*sys.argv[1:])
# end::TCP_MOJIFINDER_MAIN[]

View File

@@ -0,0 +1,35 @@
from pathlib import Path
from unicodedata import name
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from charindex import InvertedIndex
app = FastAPI( # <1>
title='Mojifinder Web',
description='Search for Unicode characters by name.',
)
class CharName(BaseModel): # <2>
char: str
name: str
def init(app): # <3>
app.state.index = InvertedIndex()
static = Path(__file__).parent.absolute() / 'static' # <4>
app.state.form = (static / 'form.html').read_text()
init(app) # <5>
@app.get('/search', response_model=list[CharName]) # <6>
async def search(q: str): # <7>
chars = app.state.index.search(q)
return ({'char': c, 'name': name(c)} for c in chars) # <8>
@app.get('/', response_class=HTMLResponse, include_in_schema=False)
def form(): # <9>
return app.state.form
# no main funcion # <10>

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python3
import json
import unicodedata
from bottle import route, request, run, static_file
from charindex import InvertedIndex
index = {}
@route('/')
def form():
return static_file('form.html', root='static/')
@route('/search')
def search():
query = request.query['q']
chars = index.search(query)
results = []
for char in chars:
name = unicodedata.name(char)
results.append({'char': char, 'name': name})
return json.dumps(results).encode('UTF-8')
def main(port):
global index
index = InvertedIndex()
run(host='localhost', port=port, debug=True)
if __name__ == '__main__':
main(8000)