update from O'Reilly repo

This commit is contained in:
Luciano Ramalho
2021-03-22 12:24:21 -03:00
parent e1cd63aa04
commit 2f8bf06270
28 changed files with 470 additions and 125 deletions

View File

@@ -1,4 +1,4 @@
from typing import Protocol, Any from typing import Protocol, Any
class Comparable(Protocol): # <1> class SupportsLessThan(Protocol): # <1>
def __lt__(self, other: Any) -> bool: ... # <2> def __lt__(self, other: Any) -> bool: ... # <2>

View File

@@ -1,35 +1,35 @@
# tag::MYMAX_TYPES[] # tag::MYMAX_TYPES[]
from typing import Protocol, Any, TypeVar, overload, Callable, Iterable, Union from typing import Protocol, Any, TypeVar, overload, Callable, Iterable, Union
class _Comparable(Protocol): class SupportsLessThan(Protocol):
def __lt__(self, other: Any) -> bool: ... def __lt__(self, other: Any) -> bool: ...
_T = TypeVar('_T') T = TypeVar('T')
_CT = TypeVar('_CT', bound=_Comparable) LT = TypeVar('LT', bound=SupportsLessThan)
_DT = TypeVar('_DT') DT = TypeVar('DT')
MISSING = object() MISSING = object()
EMPTY_MSG = 'max() arg is an empty sequence' EMPTY_MSG = 'max() arg is an empty sequence'
@overload @overload
def max(__arg1: _CT, __arg2: _CT, *_args: _CT, key: None = ...) -> _CT: def max(__arg1: LT, __arg2: LT, *_args: LT, key: None = ...) -> LT:
... ...
@overload @overload
def max(__arg1: _T, __arg2: _T, *_args: _T, key: Callable[[_T], _CT]) -> _T: def max(__arg1: T, __arg2: T, *_args: T, key: Callable[[T], LT]) -> T:
... ...
@overload @overload
def max(__iterable: Iterable[_CT], *, key: None = ...) -> _CT: def max(__iterable: Iterable[LT], *, key: None = ...) -> LT:
... ...
@overload @overload
def max(__iterable: Iterable[_T], *, key: Callable[[_T], _CT]) -> _T: def max(__iterable: Iterable[T], *, key: Callable[[T], LT]) -> T:
... ...
@overload @overload
def max(__iterable: Iterable[_CT], *, key: None = ..., def max(__iterable: Iterable[LT], *, key: None = ...,
default: _DT) -> Union[_CT, _DT]: default: DT) -> Union[LT, DT]:
... ...
@overload @overload
def max(__iterable: Iterable[_T], *, key: Callable[[_T], _CT], def max(__iterable: Iterable[T], *, key: Callable[[T], LT],
default: _DT) -> Union[_T, _DT]: default: DT) -> Union[T, DT]:
... ...
# end::MYMAX_TYPES[] # end::MYMAX_TYPES[]
# tag::MYMAX[] # tag::MYMAX[]
@@ -57,4 +57,4 @@ def max(first, *args, key=None, default=MISSING):
candidate = current candidate = current
candidate_key = current_key candidate_key = current_key
return candidate return candidate
# end::MYMAX[] # end::MYMAX[]

View File

@@ -116,6 +116,7 @@ def error_single_arg_not_iterable() -> None:
except TypeError as exc: except TypeError as exc:
print(exc) print(exc)
###################################### run demo and error functions
def main(): def main():
for name, val in globals().items(): for name, val in globals().items():

View File

@@ -21,10 +21,10 @@ Example:
# tag::TOP[] # tag::TOP[]
from typing import TypeVar, Iterable, List from typing import TypeVar, Iterable, List
from comparable import Comparable from comparable import SupportsLessThan
CT = TypeVar('CT', bound=Comparable) LT = TypeVar('LT', bound=SupportsLessThan)
def top(series: Iterable[CT], length: int) -> List[CT]: def top(series: Iterable[LT], length: int) -> List[LT]:
return sorted(series, reverse=True)[:length] return sorted(series, reverse=True)[:length]
# end::TOP[] # end::TOP[]

View File

@@ -29,6 +29,7 @@ def test_top_tuples() -> None:
reveal_type(result) reveal_type(result)
assert result == expected assert result == expected
# intentional type error
def test_top_objects_error() -> None: def test_top_objects_error() -> None:
series = [object() for _ in range(4)] series = [object() for _ in range(4)]
if TYPE_CHECKING: if TYPE_CHECKING:

View File

@@ -54,7 +54,9 @@ def load_hash() -> Tuple[bytes, bytes]:
salted_hash = fp.read() salted_hash = fp.read()
except FileNotFoundError: except FileNotFoundError:
print('ERROR: passphrase hash file not found.', HELP) print('ERROR: passphrase hash file not found.', HELP)
sys.exit(2) # "standard" exit status codes:
# https://stackoverflow.com/questions/1101957/are-there-any-standard-exit-status-codes-in-linux/40484670#40484670
sys.exit(74) # input/output error
salt, stored_hash = salted_hash.split(b':') salt, stored_hash = salted_hash.split(b':')
return b64decode(salt), b64decode(stored_hash) return b64decode(salt), b64decode(stored_hash)
@@ -93,7 +95,7 @@ def main(argv: Sequence[str]) -> None:
save_hash() save_hash()
else: else:
print('ERROR: invalid argument.', HELP) print('ERROR: invalid argument.', HELP)
sys.exit(1) sys.exit(2) # command line usage error
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -9,7 +9,7 @@ if len(sys.argv) == 2:
module = importlib.import_module(module_name) module = importlib.import_module(module_name)
else: else:
print(f'Usage: {sys.argv[0]} <vector-module-to-test>') print(f'Usage: {sys.argv[0]} <vector-module-to-test>')
sys.exit(1) sys.exit(2) # command line usage error
fmt = 'Selected Vector2d type: {.__name__}.{.__name__}' fmt = 'Selected Vector2d type: {.__name__}.{.__name__}'
print(fmt.format(module, module.Vector2d)) print(fmt.format(module, module.Vector2d))

14
15-type-hints/erp.py Normal file
View File

@@ -0,0 +1,14 @@
import random
from typing import TypeVar, Generic, List, Iterable
T = TypeVar('T')
class EnterpriserRandomPopper(Generic[T]):
def __init__(self, items: Iterable[T]) -> None:
self._items: List[T] = list(items)
random.shuffle(self._items)
def pop_random(self) -> T:
return self._items.pop()

38
15-type-hints/erp_test.py Normal file
View File

@@ -0,0 +1,38 @@
import random
from typing import Iterable, TYPE_CHECKING, List
from erp import EnterpriserRandomPopper
import randompop
def test_issubclass() -> None:
assert issubclass(EnterpriserRandomPopper, randompop.RandomPopper)
def test_isinstance_untyped_items_argument() -> None:
items = [1, 2, 3]
popper = EnterpriserRandomPopper(items) # [int] is not required
if TYPE_CHECKING:
reveal_type(popper)
# Revealed type is 'erp.EnterpriserRandomPopper[builtins.int*]'
assert isinstance(popper, randompop.RandomPopper)
def test_isinstance_untyped_items_in_var_type() -> None:
items = [1, 2, 3]
popper: EnterpriserRandomPopper = EnterpriserRandomPopper[int](items)
if TYPE_CHECKING:
reveal_type(popper)
# Revealed type is 'erp.EnterpriserRandomPopper[Any]'
assert isinstance(popper, randompop.RandomPopper)
def test_isinstance_item() -> None:
items = [1, 2, 3]
popper = EnterpriserRandomPopper[int](items) # [int] is not required
popped = popper.pop_random()
if TYPE_CHECKING:
reveal_type(popped)
# Revealed type is 'builtins.int*'
assert isinstance(popped, int)

View File

@@ -0,0 +1,7 @@
from typing import Protocol, runtime_checkable, TypeVar
T_co = TypeVar('T_co', covariant=True)
@runtime_checkable
class GenericRandomPicker(Protocol[T_co]):
def pick(self) -> T_co: ...

View File

@@ -0,0 +1,35 @@
import random
from typing import Iterable, TYPE_CHECKING
from randompick_generic import GenericRandomPicker
class LottoPicker():
def __init__(self, items: Iterable[int]) -> None:
self._items = list(items)
random.shuffle(self._items)
def pick(self) -> int:
return self._items.pop()
def test_issubclass() -> None:
assert issubclass(LottoPicker, GenericRandomPicker)
def test_isinstance() -> None:
popper: GenericRandomPicker = LottoPicker([1])
if TYPE_CHECKING:
reveal_type(popper)
# Revealed type is '???'
assert isinstance(popper, LottoPicker)
def test_pick_type() -> None:
balls = [1, 2, 3]
popper = LottoPicker(balls)
pick = popper.pick()
assert pick in balls
if TYPE_CHECKING:
reveal_type(pick)
# Revealed type is '???'

View File

@@ -0,0 +1,6 @@
from typing import Protocol, TypeVar, runtime_checkable, Any
@runtime_checkable
class RandomPopper(Protocol):
def pop_random(self) -> Any: ...

View File

@@ -0,0 +1,24 @@
from randompop import RandomPopper
import random
from typing import Any, Iterable, TYPE_CHECKING
class SimplePopper():
def __init__(self, items: Iterable) -> None:
self._items = list(items)
random.shuffle(self._items)
def pop_random(self) -> Any:
return self._items.pop()
def test_issubclass() -> None:
assert issubclass(SimplePopper, RandomPopper)
def test_isinstance() -> None:
popper: RandomPopper = SimplePopper([1])
if TYPE_CHECKING:
reveal_type(popper)
# Revealed type is 'randompop.RandomPopper'
assert isinstance(popper, RandomPopper)

View File

@@ -30,7 +30,7 @@ def main():
word_number = int(sys.argv[2]) word_number = int(sys.argv[2])
except (IndexError, ValueError): except (IndexError, ValueError):
print('Usage: %s <file-name> <word-number>' % sys.argv[0]) print('Usage: %s <file-name> <word-number>' % sys.argv[0])
sys.exit(1) sys.exit(2) # command line usage error
with open(filename, 'rt', encoding='utf-8') as text_file: with open(filename, 'rt', encoding='utf-8') as text_file:
s = Sentence(text_file.read()) s = Sentence(text_file.read())
for n, word in enumerate(s, 1): for n, word in enumerate(s, 1):

View File

@@ -51,7 +51,7 @@ def main():
word_number = int(sys.argv[2]) word_number = int(sys.argv[2])
except (IndexError, ValueError): except (IndexError, ValueError):
print('Usage: %s <file-name> <word-number>' % sys.argv[0]) print('Usage: %s <file-name> <word-number>' % sys.argv[0])
sys.exit(1) sys.exit(2) # command line usage error
with open(filename, 'rt', encoding='utf-8') as text_file: with open(filename, 'rt', encoding='utf-8') as text_file:
s = Sentence(text_file.read()) s = Sentence(text_file.read())
for n, word in enumerate(s, 1): for n, word in enumerate(s, 1):

View File

@@ -1,43 +0,0 @@
# tag::PRIMES_PROC_TOP[]
from time import perf_counter
from typing import List, NamedTuple
from multiprocessing import Process, SimpleQueue # <1>
from primes import is_prime, NUMBERS
class Result(NamedTuple): # <3>
flag: bool
elapsed: float
def check(n: int) -> Result: # <5>
t0 = perf_counter()
res = is_prime(n)
return Result(res, perf_counter() - t0)
def job(n: int, results: SimpleQueue) -> None: # <6>
results.put((n, check(n))) # <7>
# end::PRIMES_PROC_TOP[]
# tag::PRIMES_PROC_MAIN[]
def main() -> None:
t0 = perf_counter()
results = SimpleQueue() # type: ignore
workers: List[Process] = [] # <2>
for n in NUMBERS:
worker = Process(target=job, args=(n, results)) # <3>
worker.start() # <4>
workers.append(worker) # <5>
for _ in workers: # <6>
n, (prime, elapsed) = results.get() # <7>
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s')
elapsed = perf_counter() - t0
print(f'Total time: {elapsed:.2f}s')
if __name__ == '__main__':
main()
# end::PRIMES_PROC_MAIN[]

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
import math
PRIME_FIXTURE = [
(2, True),
(142702110479723, True),
(299593572317531, True),
(3333333333333301, True),
(3333333333333333, False),
(3333335652092209, False),
(4444444444444423, True),
(4444444444444444, False),
(4444444488888889, False),
(5555553133149889, False),
(5555555555555503, True),
(5555555555555555, False),
(6666666666666666, False),
(6666666666666719, True),
(6666667141414921, False),
(7777777536340681, False),
(7777777777777753, True),
(7777777777777777, False),
(9999999999999917, True),
(9999999999999999, False),
]
NUMBERS = [n for n, _ in PRIME_FIXTURE]
# tag::IS_PRIME[]
def is_prime(n: int) -> bool:
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
root = math.floor(math.sqrt(n))
for i in range(3, root + 1, 2):
if n % i == 0:
return False
return True
# end::IS_PRIME[]
if __name__ == '__main__':
for n, prime in PRIME_FIXTURE:
prime_res = is_prime(n)
assert prime_res == prime
print(n, prime)

View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""
procs.py: shows that multiprocessing on a multicore machine
can be faster than sequential code for CPU-intensive work.
"""
# tag::PRIMES_PROC_TOP[]
import sys
from time import perf_counter
from typing import NamedTuple
from multiprocessing import Process, SimpleQueue, cpu_count # <1>
from multiprocessing import queues # <2>
from primes import is_prime, NUMBERS
class PrimeResult(NamedTuple): # <3>
n: int
prime: bool
elapsed: float
JobQueue = queues.SimpleQueue # <4>
ResultQueue = queues.SimpleQueue # <5>
def check(n: int) -> PrimeResult: # <6>
t0 = perf_counter()
res = is_prime(n)
return PrimeResult(n, res, perf_counter() - t0)
def worker(jobs: JobQueue, results: ResultQueue) -> None: # <7>
while True:
n = jobs.get() # <8>
if n == 0:
break
results.put(check(n)) # <9>
# end::PRIMES_PROC_TOP[]
# tag::PRIMES_PROC_MAIN[]
def main() -> None:
if len(sys.argv) < 2: # <1>
workers = cpu_count()
else:
workers = int(sys.argv[1])
print(f'Checking {len(NUMBERS)} numbers with {workers} processes:')
jobs: JobQueue = SimpleQueue() # <2>
results: ResultQueue = SimpleQueue()
t0 = perf_counter()
for n in NUMBERS: # <3>
jobs.put(n)
for _ in range(workers):
proc = Process(target=worker, args=(jobs, results)) # <4>
proc.start() # <5>
jobs.put(0) # <6>
while True:
n, prime, elapsed = results.get() # <7>
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s') # <8>
if jobs.empty(): # <9>
break
elapsed = perf_counter() - t0
print(f'Total time: {elapsed:.2f}s')
if __name__ == '__main__':
main()
# end::PRIMES_PROC_MAIN[]

View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""Download flags of countries (with error handling).
asyncio async/await version using run_in_executor for save_flag.
"""
import asyncio
from collections import Counter
import aiohttp
import tqdm # type: ignore
from flags2_common import main, HTTPStatus, Result, save_flag
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
class FetchError(Exception):
def __init__(self, country_code: str):
self.country_code = country_code
async def get_flag(session: aiohttp.ClientSession,
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
async with session.get(url) as resp:
if resp.status == 200:
return await resp.read()
else:
resp.raise_for_status()
return bytes()
# tag::FLAGS2_ASYNCIO_EXECUTOR[]
async def download_one(session: aiohttp.ClientSession,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> Result:
try:
async with semaphore:
image = await get_flag(session, base_url, cc)
except aiohttp.ClientResponseError as exc:
if exc.status == 404:
status = HTTPStatus.not_found
msg = 'not found'
else:
raise FetchError(cc) from exc
else:
loop = asyncio.get_running_loop() # <1>
loop.run_in_executor(None, # <2>
save_flag, image, f'{cc}.gif') # <3>
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(status, cc)
# end::FLAGS2_ASYNCIO_EXECUTOR[]
async def supervisor(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[HTTPStatus]:
counter: Counter[HTTPStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req)
async with aiohttp.ClientSession() as session:
to_do = [download_one(session, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
for coro in to_do_iter:
try:
res = await coro
except FetchError as exc:
country_code = exc.country_code
try:
error_msg = exc.__cause__.message # type: ignore
except AttributeError:
error_msg = 'Unknown cause'
if verbose and error_msg:
print(f'*** Error for {country_code}: {error_msg}')
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1
return counter
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[HTTPStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14>
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

View File

@@ -121,23 +121,25 @@ def process_args(default_concur_req):
if args.max_req < 1: if args.max_req < 1:
print('*** Usage error: --max_req CONCURRENT must be >= 1') print('*** Usage error: --max_req CONCURRENT must be >= 1')
parser.print_usage() parser.print_usage()
sys.exit(1) # "standard" exit status codes:
# https://stackoverflow.com/questions/1101957/are-there-any-standard-exit-status-codes-in-linux/40484670#40484670
sys.exit(2) # command line usage error
if args.limit < 1: if args.limit < 1:
print('*** Usage error: --limit N must be >= 1') print('*** Usage error: --limit N must be >= 1')
parser.print_usage() parser.print_usage()
sys.exit(1) sys.exit(2) # command line usage error
args.server = args.server.upper() args.server = args.server.upper()
if args.server not in SERVERS: if args.server not in SERVERS:
print(f'*** Usage error: --server LABEL ' print(f'*** Usage error: --server LABEL '
f'must be one of {server_options}') f'must be one of {server_options}')
parser.print_usage() parser.print_usage()
sys.exit(1) sys.exit(2) # command line usage error
try: try:
cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit) cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
except ValueError as exc: except ValueError as exc:
print(exc.args[0]) print(exc.args[0])
parser.print_usage() parser.print_usage()
sys.exit(1) sys.exit(2) # command line usage error
if not cc_list: if not cc_list:
cc_list = sorted(POP20_CC) cc_list = sorted(POP20_CC)

4
22-async/README.rst Normal file
View File

@@ -0,0 +1,4 @@
Sample code for Chapter 22 - "Asynchronous programming"
From the book "Fluent Python, Second Edition" by Luciano Ramalho (O'Reilly, 2021)
https://learning.oreilly.com/library/view/fluent-python-2nd/9781492056348/

View File

@@ -0,0 +1,28 @@
domainlib demonstration
=======================
Run Python's async console (requires Python ≥ 3.8)::
$ python3 -m asyncio
I'll see ``asyncio`` imported automatically::
>>> import asyncio
Now you can experiment with ``domainlib``.
At the `>>>` prompt, type these commands::
>>> from domainlib import *
>>> await probe('python.org')
Note the result.
Next::
>>> names = 'python.org rust-lang.org golang.org n05uch1an9.org'.split()
>>> async for result in multi_probe(names):
... print(*result, sep='\t')
Note that if you run the last two lines again,
the results are likely to appear in a different order.

View File

@@ -1,30 +1,28 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from curio import run, TaskGroup from curio import run, TaskGroup
from curio.socket import getaddrinfo, gaierror import curio.socket as socket
from keyword import kwlist from keyword import kwlist
MAX_KEYWORD_LEN = 4 # <1> MAX_KEYWORD_LEN = 4
async def probe(domain: str) -> tuple[str, bool]: # <2> async def probe(domain: str) -> tuple[str, bool]: # <1>
try: try:
await getaddrinfo(domain, None) # <4> await socket.getaddrinfo(domain, None) # <2>
except gaierror: except socket.gaierror:
return (domain, False) return (domain, False)
return (domain, True) return (domain, True)
async def main() -> None:
async def main() -> None: # <5> names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # <6> domains = (f'{name}.dev'.lower() for name in names)
domains = (f'{name}.dev'.lower() for name in names) # <7> async with TaskGroup() as group: # <3>
async with TaskGroup() as group:
for domain in domains: for domain in domains:
await group.spawn(probe, domain) await group.spawn(probe, domain) # <4>
async for task in group: # <9> async for task in group: # <5>
domain, found = task.result # <10> domain, found = task.result
mark = '+' if found else ' ' mark = '+' if found else ' '
print(f'{mark} {domain}') print(f'{mark} {domain}')
if __name__ == '__main__': if __name__ == '__main__':
run(main()) # <11> run(main()) # <6>

View File

@@ -1,5 +1,5 @@
from curio import TaskGroup from curio import TaskGroup
from curio.socket import getaddrinfo, gaierror import curio.socket as socket
from collections.abc import Iterable, AsyncIterator from collections.abc import Iterable, AsyncIterator
from typing import NamedTuple from typing import NamedTuple
@@ -9,10 +9,10 @@ class Result(NamedTuple):
found: bool found: bool
async def probe(domain: str) -> Result: async def probe(domain: str) -> Result:
try: try:
await getaddrinfo(domain, None) await socket.getaddrinfo(domain, None)
except gaierror: except socket.gaierror:
return Result(domain, False) return Result(domain, False)
return Result(domain, True) return Result(domain, True)

View File

@@ -4,28 +4,28 @@
Class ``InvertedIndex`` builds an inverted index mapping each word to Class ``InvertedIndex`` builds an inverted index mapping each word to
the set of Unicode characters which contain that word in their names. the set of Unicode characters which contain that word in their names.
Optional arguments to the constructor are ``first`` and ``last+1`` character Optional arguments to the constructor are ``first`` and ``last+1``
codes to index, to make testing easier. character codes to index, to make testing easier. In the examples
below, only the ASCII range was indexed.
In the example below, only the ASCII range was indexed:: The `entries` attribute is a `defaultdict` with uppercased single
words as keys::
>>> idx = InvertedIndex(32, 128) >>> idx = InvertedIndex(32, 128)
>>> idx.entries['DOLLAR']
{'$'}
>>> sorted(idx.entries['SIGN']) >>> sorted(idx.entries['SIGN'])
['#', '$', '%', '+', '<', '=', '>'] ['#', '$', '%', '+', '<', '=', '>']
>>> sorted(idx.entries['DIGIT']) >>> idx.entries['A'] & idx.entries['SMALL']
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] {'a'}
>>> idx.entries['DIGIT'] & idx.entries['EIGHT'] >>> idx.entries['BRILLIG']
{'8'} set()
>>> idx.search('digit')
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] The `.search()` method takes a string, uppercases it, splits it into
>>> idx.search('eight digit') words, and returns the intersection of the entries for each word::
['8']
>>> idx.search('a letter') >>> idx.search('capital a')
['A', 'a'] {'A'}
>>> idx.search('a letter capital')
['A']
>>> idx.search('borogove')
[]
""" """
@@ -58,17 +58,16 @@ class InvertedIndex:
entries[word].add(char) entries[word].add(char)
self.entries = entries self.entries = entries
def search(self, query: str) -> list[Char]: def search(self, query: str) -> set[Char]:
if words := list(tokenize(query)): if words := list(tokenize(query)):
first = self.entries[words[0]] found = self.entries[words[0]]
result = first.intersection(*(self.entries[w] for w in words[1:])) return found.intersection(*(self.entries[w] for w in words[1:]))
return sorted(result)
else: else:
return [] return set()
def format_results(chars: list[Char]) -> Iterator[str]: def format_results(chars: set[Char]) -> Iterator[str]:
for char in chars: for char in sorted(chars):
name = unicodedata.name(char) name = unicodedata.name(char)
code = ord(char) code = ord(char)
yield f'U+{code:04X}\t{char}\t{name}' yield f'U+{code:04X}\t{char}\t{name}'
@@ -77,7 +76,7 @@ def format_results(chars: list[Char]) -> Iterator[str]:
def main(words: list[str]) -> None: def main(words: list[str]) -> None:
if not words: if not words:
print('Please give one or more words to search.') print('Please give one or more words to search.')
sys.exit() sys.exit(2) # command line usage error
index = InvertedIndex() index = InvertedIndex()
chars = index.search(' '.join(words)) chars = index.search(' '.join(words))
for line in format_results(chars): for line in format_results(chars):

View File

@@ -66,8 +66,6 @@
const input = document.getElementById('query'); const input = document.getElementById('query');
input.addEventListener('change', updateTable); input.addEventListener('change', updateTable);
}); });
</script> </script>
</head> </head>

View File

@@ -18,19 +18,19 @@ class CharName(BaseModel): # <2>
def init(app): # <3> def init(app): # <3>
app.state.index = InvertedIndex() app.state.index = InvertedIndex()
static = pathlib.Path(__file__).parent.absolute() / 'static' static = pathlib.Path(__file__).parent.absolute() / 'static' # <4>
with open(static / 'form.html') as fp: with open(static / 'form.html') as fp:
app.state.form = fp.read() app.state.form = fp.read()
init(app) # <4> init(app) # <5>
@app.get('/search', response_model=list[CharName]) # <5> @app.get('/search', response_model=list[CharName]) # <6>
async def search(q: str): # <6> async def search(q: str): # <7>
chars = app.state.index.search(q) chars = app.state.index.search(q)
return ({'char': c, 'name': name(c)} for c in chars) # <7> return ({'char': c, 'name': name(c)} for c in chars) # <8>
@app.get('/', # <8> @app.get('/', response_class=HTMLResponse, include_in_schema=False)
response_class=HTMLResponse, def form(): # <9>
include_in_schema=False)
def form():
return app.state.form return app.state.form
# no main funcion # <10>

View File

@@ -36,7 +36,7 @@ Part / Chapter #|Title|Directory|Notebook|1<sup>st</sup> ed. Chapter&nbsp;#
**IV Object-Oriented Idioms**| **IV Object-Oriented Idioms**|
11|A Pythonic Object|[11-pythonic-obj](11-pythonic-obj)||9 11|A Pythonic Object|[11-pythonic-obj](11-pythonic-obj)||9
12|Sequence Hacking, Hashing, and Slicing|[12-seq-hacking](12-seq-hacking)||10 12|Sequence Hacking, Hashing, and Slicing|[12-seq-hacking](12-seq-hacking)||10
13|Interfaces: Interfaces, Protocols, and ABCs|[13-protocl-abc](13-protocol-abc)||11 13|Interfaces, Protocols, and ABCs|[13-protocl-abc](13-protocol-abc)||11
14|Inheritance: For Good or For Worse|[14-inheritance](14-inheritance)||12 14|Inheritance: For Good or For Worse|[14-inheritance](14-inheritance)||12
🆕 15|More About Type Hints|[15-type-hints](15-type-hints)|| 🆕 15|More About Type Hints|[15-type-hints](15-type-hints)||
16|Operator Overloading: Doing It Right|[16-op-overloading](16-op-overloading)||13 16|Operator Overloading: Doing It Right|[16-op-overloading](16-op-overloading)||13