2e reviewed manuscript

This commit is contained in:
Luciano Ramalho 2021-11-12 11:33:12 -03:00
parent f5e3cb8ad3
commit 80f7f84274
32 changed files with 323 additions and 156 deletions

View File

@ -84,7 +84,7 @@ def standard_env() -> Environment:
'-': op.sub,
'*': op.mul,
'/': op.truediv,
'//': op.floordiv,
'quotient': op.floordiv,
'>': op.gt,
'<': op.lt,
'>=': op.ge,
@ -149,7 +149,7 @@ def evaluate(exp: Expression, env: Environment) -> Any:
# end::EVAL_MATCH_TOP[]
case int(x) | float(x):
return x
case Symbol(name):
case Symbol() as name:
return env[name]
# tag::EVAL_MATCH_MIDDLE[]
case ['quote', x]: # <1>
@ -161,12 +161,12 @@ def evaluate(exp: Expression, env: Environment) -> Any:
return evaluate(alternative, env)
case ['lambda', [*parms], *body] if body: # <3>
return Procedure(parms, body, env)
case ['define', Symbol(name), value_exp]: # <4>
case ['define', Symbol() as name, value_exp]: # <4>
env[name] = evaluate(value_exp, env)
# end::EVAL_MATCH_MIDDLE[]
case ['define', [Symbol(name), *parms], *body] if body:
case ['define', [Symbol() as name, *parms], *body] if body:
env[name] = Procedure(parms, body, env)
case ['set!', Symbol(name), value_exp]:
case ['set!', Symbol() as name, value_exp]:
env.change(name, evaluate(value_exp, env))
case [func_exp, *args] if func_exp not in KEYWORDS:
proc = evaluate(func_exp, env)

View File

@ -79,8 +79,6 @@ Tests of hashing:
>>> v1 = Vector2d(3, 4)
>>> v2 = Vector2d(3.1, 4.2)
>>> hash(v1), hash(v2)
(7, 384307168202284039)
>>> len({v1, v2})
2
@ -124,7 +122,7 @@ class Vector2d:
return tuple(self) == tuple(other)
def __hash__(self):
return hash(self.x) ^ hash(self.y)
return hash((self.x, self.y))
def __abs__(self):
return math.hypot(self.x, self.y)

View File

@ -81,8 +81,6 @@ Tests of hashing:
>>> v1 = Vector2d(3, 4)
>>> v2 = Vector2d(3.1, 4.2)
>>> hash(v1), hash(v2)
(7, 384307168202284039)
>>> len({v1, v2})
2
@ -131,7 +129,7 @@ class Vector2d:
# tag::VECTOR_V3_HASH[]
def __hash__(self):
return hash(self.x) ^ hash(self.y)
return hash((self.x, self.y))
# end::VECTOR_V3_HASH[]
def __abs__(self):

View File

@ -78,8 +78,6 @@ Tests of hashing:
>>> v1 = Vector2d(3, 4)
>>> v2 = Vector2d(3.1, 4.2)
>>> hash(v1), hash(v2)
(7, 384307168202284039)
>>> len({v1, v2})
2
@ -126,7 +124,7 @@ class Vector2d:
return tuple(self) == tuple(other)
def __hash__(self):
return hash(self.x) ^ hash(self.y)
return hash((self.x, self.y))
def __abs__(self):
return math.hypot(self.x, self.y)

View File

@ -27,11 +27,13 @@ Doctest for `Environment`
>>> inner_env = {'a': 2}
>>> outer_env = {'a': 0, 'b': 1}
>>> env = Environment(inner_env, outer_env)
>>> env['a'] = 111 # <1>
>>> env['a'] # <1>
2
>>> env['a'] = 111 # <2>
>>> env['c'] = 222
>>> env
Environment({'a': 111, 'c': 222}, {'a': 0, 'b': 1})
>>> env.change('b', 333) # <2>
>>> env.change('b', 333) # <3>
>>> env
Environment({'a': 111, 'c': 222}, {'a': 0, 'b': 333})

View File

@ -65,7 +65,7 @@ def parse_atom(token: str) -> Atom:
class Environment(ChainMap[Symbol, Any]):
"A ChainMap that allows changing an item in-place."
def change(self, key: Symbol, value: object) -> None:
def change(self, key: Symbol, value: Any) -> None:
"Find where key is defined and change the value there."
for map in self.maps:
if key in map:

View File

@ -62,7 +62,7 @@ def parse_atom(token: str) -> Atom:
class Environment(ChainMap[Symbol, Any]):
"A ChainMap that allows changing an item in-place."
def change(self, key: Symbol, value: object) -> None:
def change(self, key: Symbol, value: Any) -> None:
"Find where key is defined and change the value there."
for map in self.maps:
if key in map:

View File

@ -37,7 +37,7 @@ async def download_one(client: httpx.AsyncClient,
try:
async with semaphore: # <3>
image = await get_flag(client, base_url, cc)
except httpx.HTTPStatusError as exc: # <5>
except httpx.HTTPStatusError as exc: # <4>
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND
@ -45,7 +45,7 @@ async def download_one(client: httpx.AsyncClient,
else:
raise
else:
await asyncio.to_thread(save_flag, image, f'{cc}.gif') # <6>
await asyncio.to_thread(save_flag, image, f'{cc}.gif') # <5>
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:

View File

@ -22,23 +22,23 @@ DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
async def get_flag(session: httpx.AsyncClient, # <2>
async def get_flag(client: httpx.AsyncClient, # <2>
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = await session.get(url, timeout=3.1, follow_redirects=True) # <3>
resp = await client.get(url, timeout=3.1, follow_redirects=True) # <3>
resp.raise_for_status()
return resp.content
async def download_one(session: httpx.AsyncClient,
async def download_one(client: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus:
try:
async with semaphore:
image = await get_flag(session, base_url, cc)
image = await get_flag(client, base_url, cc)
except httpx.HTTPStatusError as exc:
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
@ -64,8 +64,8 @@ async def supervisor(cc_list: list[str],
concur_req: int) -> Counter[DownloadStatus]: # <1>
counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) # <2>
async with httpx.AsyncClient() as session:
to_do = [download_one(session, cc, base_url, semaphore, verbose)
async with httpx.AsyncClient() as client:
to_do = [download_one(client, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4>
if not verbose:

View File

@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""Download flags of countries (with error handling).
asyncio async/await version
"""
# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path
import httpx
import tqdm # type: ignore
from flags2_common import main, DownloadStatus, save_flag
# low concurrency default to avoid errors from remote site,
# such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
async def get_flag(client: httpx.AsyncClient, # <1>
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = await client.get(url, timeout=3.1, follow_redirects=True) # <2>
resp.raise_for_status()
return resp.content
# tag::FLAGS3_ASYNCIO_GET_COUNTRY[]
async def get_country(client: httpx.AsyncClient,
base_url: str,
cc: str) -> str: # <1>
url = f'{base_url}/{cc}/metadata.json'.lower()
resp = await client.get(url, timeout=3.1, follow_redirects=True)
resp.raise_for_status()
metadata = resp.json() # <2>
return metadata['country'] # <3>
# end::FLAGS3_ASYNCIO_GET_COUNTRY[]
# tag::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
async def download_one(client: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus:
try:
async with semaphore: # <1>
image = await get_flag(client, base_url, cc)
async with semaphore: # <2>
country = await get_country(client, base_url, cc)
except httpx.HTTPStatusError as exc:
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}'
else:
raise
else:
filename = country.replace(' ', '_') # <3>
await asyncio.to_thread(save_flag, image, f'{filename}.gif')
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:
print(cc, msg)
return status
# end::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
# tag::FLAGS2_ASYNCIO_START[]
async def supervisor(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]: # <1>
counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) # <2>
async with httpx.AsyncClient() as client:
to_do = [download_one(client, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4>
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
error: httpx.HTTPError | None = None # <6>
for coro in to_do_iter: # <7>
try:
status = await coro # <8>
except httpx.HTTPStatusError as exc:
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
error = exc # <9>
except httpx.RequestError as exc:
error_msg = f'{exc} {type(exc)}'.strip()
error = exc # <10>
except KeyboardInterrupt:
break
if error:
status = DownloadStatus.ERROR # <11>
if verbose:
url = str(error.request.url) # <12>
cc = Path(url).stem.upper() # <13>
print(f'{cc} error: {error_msg}')
counter[status] += 1
return counter
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14>
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]

View File

@ -17,15 +17,15 @@ from httpx import AsyncClient # <1>
from flags import BASE_URL, save_flag, main # <2>
async def download_one(session: AsyncClient, cc: str): # <3>
image = await get_flag(session, cc)
async def download_one(client: AsyncClient, cc: str): # <3>
image = await get_flag(client, cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return cc
async def get_flag(session: AsyncClient, cc: str) -> bytes: # <4>
async def get_flag(client: AsyncClient, cc: str) -> bytes: # <4>
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = await session.get(url, timeout=6.1,
resp = await client.get(url, timeout=6.1,
follow_redirects=True) # <5>
return resp.read() # <6>
# end::FLAGS_ASYNCIO_TOP[]
@ -35,8 +35,8 @@ def download_many(cc_list: list[str]) -> int: # <1>
return asyncio.run(supervisor(cc_list)) # <2>
async def supervisor(cc_list: list[str]) -> int:
async with AsyncClient() as session: # <3>
to_do = [download_one(session, cc)
async with AsyncClient() as client: # <3>
to_do = [download_one(client, cc)
for cc in sorted(cc_list)] # <4>
res = await asyncio.gather(*to_do) # <5>

View File

@ -5,6 +5,7 @@ import httpx # make httpx classes available to .__subclasses__()
def tree(cls, level=0, last_sibling=True):
yield cls, level, last_sibling
# get RuntimeError and exceptions defined in httpx
subclasses = [sub for sub in cls.__subclasses__()
if sub is RuntimeError or sub.__module__ == 'httpx']

View File

@ -1,38 +0,0 @@
import httpx
def tree(cls, level=0):
yield cls.__name__, level
for sub_cls in cls.__subclasses__():
yield from tree(sub_cls, level+1)
def display(cls):
for cls_name, level in tree(cls):
indent = ' ' * 4 * level
print(f'{indent}{cls_name}')
def find_roots(module):
exceptions = []
for name in dir(module):
obj = getattr(module, name)
if isinstance(obj, type) and issubclass(obj, BaseException):
exceptions.append(obj)
roots = []
for exc in exceptions:
root = True
for other in exceptions:
if exc is not other and issubclass(exc, other):
root = False
break
if root:
roots.append(exc)
return roots
def main():
for exc in find_roots(httpx):
display(exc)
if __name__ == '__main__':
main()

View File

@ -14,26 +14,28 @@ PROMPT = b'?> '
async def finder(index: InvertedIndex, # <2>
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
writer: asyncio.StreamWriter) -> None:
client = writer.get_extra_info('peername') # <3>
while True: # <4>
writer.write(PROMPT) # can't await! # <5>
await writer.drain() # must await! # <6>
data = await reader.readline() # <7>
if not data: # <8>
break
try:
query = data.decode().strip() # <8>
except UnicodeDecodeError: # <9>
query = data.decode().strip() # <9>
except UnicodeDecodeError: # <10>
query = '\x00'
print(f' From {client}: {query!r}') # <10>
print(f' From {client}: {query!r}') # <11>
if query:
if ord(query[:1]) < 32: # <11>
if ord(query[:1]) < 32: # <12>
break
results = await search(query, index, writer) # <12>
print(f' To {client}: {results} results.') # <13>
results = await search(query, index, writer) # <13>
print(f' To {client}: {results} results.') # <14>
writer.close() # <14>
await writer.wait_closed() # <15>
print(f'Close {client}.') # <16>
writer.close() # <15>
await writer.wait_closed() # <16>
print(f'Close {client}.') # <17>
# end::TCP_MOJIFINDER_TOP[]
# tag::TCP_MOJIFINDER_SEARCH[]
@ -52,7 +54,7 @@ async def search(query: str, # <1>
# end::TCP_MOJIFINDER_SEARCH[]
# tag::TCP_MOJIFINDER_MAIN[]
async def supervisor(index: InvertedIndex, host: str, port: int):
async def supervisor(index: InvertedIndex, host: str, port: int) -> None:
server = await asyncio.start_server( # <1>
functools.partial(finder, index), # <2>
host, port) # <3>

View File

@ -7,25 +7,26 @@ from pydantic import BaseModel
from charindex import InvertedIndex
app = FastAPI( # <1>
STATIC_PATH = Path(__file__).parent.absolute() / 'static' # <1>
app = FastAPI( # <2>
title='Mojifinder Web',
description='Search for Unicode characters by name.',
)
class CharName(BaseModel): # <2>
class CharName(BaseModel): # <3>
char: str
name: str
def init(app): # <3>
def init(app): # <4>
app.state.index = InvertedIndex()
static = Path(__file__).parent.absolute() / 'static' # <4>
app.state.form = (static / 'form.html').read_text()
app.state.form = (STATIC_PATH / 'form.html').read_text()
init(app) # <5>
@app.get('/search', response_model=list[CharName]) # <6>
async def search(q: str): # <7>
chars = app.state.index.search(q)
chars = sorted(app.state.index.search(q))
return ({'char': c, 'name': name(c)} for c in chars) # <8>
@app.get('/', response_class=HTMLResponse, include_in_schema=False)

View File

@ -30,8 +30,8 @@ instance attributes, created in each ``LineItem`` instance::
>>> nutmeg = LineItem('Moluccan nutmeg', 8, 13.95)
>>> nutmeg.weight, nutmeg.price # <1>
(8, 13.95)
>>> sorted(vars(nutmeg).items()) # <2>
[('description', 'Moluccan nutmeg'), ('price', 13.95), ('weight', 8)]
>>> nutmeg.__dict__ # <2>
{'description': 'Moluccan nutmeg', 'weight': 8, 'price': 13.95}
# end::LINEITEM_V2_PROP_DEMO[]

View File

@ -54,12 +54,15 @@ class FrozenJSON:
except AttributeError:
return FrozenJSON.build(self.__data[name]) # <4>
def __dir__(self): # <5>
return self.__data.keys()
@classmethod
def build(cls, obj): # <5>
if isinstance(obj, abc.Mapping): # <6>
def build(cls, obj): # <6>
if isinstance(obj, abc.Mapping): # <7>
return cls(obj)
elif isinstance(obj, abc.MutableSequence): # <7>
elif isinstance(obj, abc.MutableSequence): # <8>
return [cls.build(item) for item in obj]
else: # <8>
else: # <9>
return obj
# end::EXPLORE0[]

View File

@ -62,11 +62,14 @@ class FrozenJSON:
# end::EXPLORE1[]
def __getattr__(self, name):
if hasattr(self.__data, name):
try:
return getattr(self.__data, name)
else:
except AttributeError:
return FrozenJSON.build(self.__data[name])
def __dir__(self): # <5>
return self.__data.keys()
@classmethod
def build(cls, obj):
if isinstance(obj, abc.Mapping):

View File

@ -47,8 +47,11 @@ class FrozenJSON:
self.__data[key] = value
def __getattr__(self, name):
if hasattr(self.__data, name):
try:
return getattr(self.__data, name)
else:
except AttributeError:
return FrozenJSON(self.__data[name]) # <4>
def __dir__(self):
return self.__data.keys()
# end::EXPLORE2[]

View File

@ -23,8 +23,7 @@ class Record:
self.__dict__.update(kwargs) # <1>
def __repr__(self):
cls_name = self.__class__.__name__
return f'<{cls_name} serial={self.serial!r}>' # <2>
return f'<{self.__class__.__name__} serial={self.serial!r}>' # <2>
def load(path=JSON_PATH):
records = {} # <3>

View File

@ -29,8 +29,7 @@ class Record:
self.__dict__.update(kwargs)
def __repr__(self):
cls_name = self.__class__.__name__
return f'<{cls_name} serial={self.serial!r}>'
return f'<{self.__class__.__name__} serial={self.serial!r}>'
@staticmethod # <3>
def fetch(key):
@ -44,10 +43,9 @@ class Record:
class Event(Record): # <1>
def __repr__(self):
if hasattr(self, 'name'): # <2>
cls_name = self.__class__.__name__
return f'<{cls_name} {self.name!r}>'
else:
try:
return f'<{self.__class__.__name__} {self.name!r}>' # <2>
except AttributeError:
return super().__repr__()
@property

View File

@ -33,8 +33,7 @@ class Record:
self.__dict__.update(kwargs)
def __repr__(self):
cls_name = self.__class__.__name__
return f'<{cls_name} serial={self.serial!r}>'
return f'<{self.__class__.__name__} serial={self.serial!r}>'
@staticmethod
def fetch(key):
@ -46,11 +45,10 @@ class Record:
class Event(Record):
def __repr__(self):
if hasattr(self, 'name'): # <3>
cls_name = self.__class__.__name__
return f'<{cls_name} {self.name!r}>'
else:
return super().__repr__() # <4>
try:
return f'<{self.__class__.__name__} {self.name!r}>'
except AttributeError:
return super().__repr__()
@property
def venue(self):

View File

@ -32,8 +32,7 @@ class Record:
self.__dict__.update(kwargs)
def __repr__(self):
cls_name = self.__class__.__name__
return f'<{cls_name} serial={self.serial!r}>'
return f'<{self.__class__.__name__} serial={self.serial!r}>'
@staticmethod
def fetch(key):
@ -52,11 +51,10 @@ class Event(Record):
# end::SCHEDULE4_INIT[]
def __repr__(self):
if hasattr(self, 'name'):
cls_name = self.__class__.__name__
return f'<{cls_name} {self.name!r}>'
else:
return super().__repr__() # <4>
try:
return f'<{self.__class__.__name__} {self.name!r}>'
except AttributeError:
return super().__repr__()
@property
def venue(self):

View File

@ -31,8 +31,7 @@ class Record:
self.__dict__.update(kwargs)
def __repr__(self):
cls_name = self.__class__.__name__
return f'<{cls_name} serial={self.serial!r}>'
return f'<{self.__class__.__name__} serial={self.serial!r}>'
@staticmethod
def fetch(key):
@ -44,11 +43,10 @@ class Record:
class Event(Record):
def __repr__(self):
if hasattr(self, 'name'):
cls_name = self.__class__.__name__
return f'<{cls_name} {self.name!r}>'
else:
return super().__repr__() # <4>
try:
return f'<{self.__class__.__name__} {self.name!r}>'
except AttributeError:
return super().__repr__()
@property
def venue(self):

View File

@ -36,8 +36,7 @@ class Record:
self.__dict__.update(kwargs)
def __repr__(self):
cls_name = self.__class__.__name__
return f'<{cls_name} serial={self.serial!r}>'
return f'<{self.__class__.__name__} serial={self.serial!r}>'
@staticmethod
def fetch(key):
@ -49,10 +48,9 @@ class Record:
class Event(Record):
def __repr__(self):
if hasattr(self, 'name'):
cls_name = self.__class__.__name__
return f'<{cls_name} {self.name!r}>'
else:
try:
return f'<{self.__class__.__name__} {self.name!r}>'
except AttributeError:
return super().__repr__()
# tag::SCHEDULE5_CACHED_PROPERTY[]

View File

@ -30,7 +30,7 @@ class NonBlank(Validated):
def validate(self, name, value):
value = value.strip()
if len(value) == 0:
if not value: # <2>
raise ValueError(f'{name} cannot be blank')
return value # <2>
return value # <3>
# end::MODEL_V5_VALIDATED_SUB[]

View File

@ -5,21 +5,18 @@ Overriding descriptor (a.k.a. data descriptor or enforced descriptor):
>>> obj = Managed() # <1>
>>> obj.over # <2>
-> Overriding.__get__(<Overriding object>, <Managed object>,
<class Managed>)
-> Overriding.__get__(<Overriding object>, <Managed object>, <class Managed>)
>>> Managed.over # <3>
-> Overriding.__get__(<Overriding object>, None, <class Managed>)
>>> obj.over = 7 # <4>
-> Overriding.__set__(<Overriding object>, <Managed object>, 7)
>>> obj.over # <5>
-> Overriding.__get__(<Overriding object>, <Managed object>,
<class Managed>)
-> Overriding.__get__(<Overriding object>, <Managed object>, <class Managed>)
>>> obj.__dict__['over'] = 8 # <6>
>>> vars(obj) # <7>
{'over': 8}
>>> obj.over # <8>
-> Overriding.__get__(<Overriding object>, <Managed object>,
<class Managed>)
-> Overriding.__get__(<Overriding object>, <Managed object>, <class Managed>)
# end::DESCR_KINDS_DEMO1[]
@ -50,8 +47,7 @@ Non-overriding descriptor (a.k.a. non-data descriptor or shadowable descriptor):
>>> obj = Managed()
>>> obj.non_over # <1>
-> NonOverriding.__get__(<NonOverriding object>, <Managed object>,
<class Managed>)
-> NonOverriding.__get__(<NonOverriding object>, <Managed object>, <class Managed>)
>>> obj.non_over = 7 # <2>
>>> obj.non_over # <3>
7
@ -59,8 +55,7 @@ Non-overriding descriptor (a.k.a. non-data descriptor or shadowable descriptor):
-> NonOverriding.__get__(<NonOverriding object>, None, <class Managed>)
>>> del obj.non_over # <5>
>>> obj.non_over # <6>
-> NonOverriding.__get__(<NonOverriding object>, <Managed object>,
<class Managed>)
-> NonOverriding.__get__(<NonOverriding object>, <Managed object>, <class Managed>)
# end::DESCR_KINDS_DEMO3[]
@ -88,7 +83,7 @@ Methods are non-overriding descriptors:
>>> Managed.spam()
Traceback (most recent call last):
...
TypeError: spam() missing 1 required positional argument: 'self'
TypeError: Managed.spam() missing 1 required positional argument: 'self'
>>> Managed.spam(obj)
-> Managed.spam(<Managed object>)
>>> Managed.spam.__get__(obj) # doctest: +ELLIPSIS
@ -156,15 +151,15 @@ def cls_name(obj_or_cls):
def display(obj):
cls = type(obj)
if cls is type:
return '<class {}>'.format(obj.__name__)
return f'<class {obj.__name__}>'
elif cls in [type(None), int]:
return repr(obj)
else:
return '<{} object>'.format(cls_name(obj))
return f'<{cls_name(obj)} object>'
def print_args(name, *args):
pseudo_args = ', '.join(display(x) for x in args)
print('-> {}.__{}__({})'.format(cls_name(args[0]), name, pseudo_args))
print(f'-> {cls_name(args[0])}.__{name}__({pseudo_args})')
### essential classes for this example ###
@ -199,6 +194,6 @@ class Managed: # <5>
non_over = NonOverriding()
def spam(self): # <6>
print('-> Managed.spam({})'.format(display(self)))
print(f'-> Managed.spam({display(self)})')
# end::DESCR_KINDS[]

View File

@ -78,7 +78,9 @@ class Field:
self.storage_name = '_' + name # <1>
self.constructor = constructor
def __get__(self, instance, owner=None): # <2>
def __get__(self, instance, owner=None):
if instance is None: # <2>
return self
return getattr(instance, self.storage_name) # <3>
def __set__(self, instance: Any, value: Any) -> None:

View File

@ -49,9 +49,8 @@ def record_factory(cls_name: str, field_names: FieldNames) -> type[tuple]: # <2
yield getattr(self, name)
def __repr__(self): # <6>
values = ', '.join(
'{}={!r}'.format(*i) for i in zip(self.__slots__, self)
)
values = ', '.join(f'{name}={value!r}'
for name, value in zip(self.__slots__, self))
cls_name = self.__class__.__name__
return f'{cls_name}({values})'

View File

@ -28,7 +28,7 @@ Here are a few tests. ``bunch_test.py`` has a few more.
>>> Point(x=1, y=2, z=3)
Traceback (most recent call last):
...
AttributeError: 'Point' object has no attribute 'z'
AttributeError: No slots left for: 'z'
>>> p = Point(x=21)
>>> p.y = 42
>>> p
@ -51,7 +51,8 @@ class MetaBunch(type): # <1>
for name, default in defaults.items(): # <5>
setattr(self, name, kwargs.pop(name, default))
if kwargs: # <6>
setattr(self, *kwargs.popitem())
extra = ', '.join(kwargs)
raise AttributeError(f'No slots left for: {extra!r}')
def __repr__(self): # <7>
rep = ', '.join(f'{name}={value!r}'

View File

@ -26,7 +26,7 @@ def test_init():
def test_init_wrong_argument():
with pytest.raises(AttributeError) as exc:
p = Point(x=1.2, y=3.4, flavor='coffee')
assert "no attribute 'flavor'" in str(exc.value)
assert 'flavor' in str(exc.value)
def test_slots():

View File

@ -0,0 +1,91 @@
"""
Could this be valid Python?
if now >= T[4:20:PM]: chill()
>>> t = T[4:20]
>>> t
T[4:20]
>>> h, m, s = t
>>> h, m, s
(4, 20, 0)
>>> t[11:59:AM]
T[11:59:AM]
>>> start = t[9:O1:PM]
>>> start
T[9:O1:PM]
>>> start.h, start.m, start.s, start.pm
(9, 1, 0, True)
>>> now = T[7:O1:PM]
>>> T[4:OO:PM]
T[4:OO:PM]
>>> now > T[4:20:PM]
True
"""
import functools
AM = -2
PM = -1
for n in range(10):
globals()[f'O{n}'] = n
OO = 0
@functools.total_ordering
class T():
def __init__(self, arg):
if isinstance(arg, slice):
h = arg.start or 0
m = arg.stop or 0
s = arg.step or 0
else:
h, m, s = 0, 0, arg
if m in (AM, PM):
self.pm = m == PM
m = 0
elif s in (AM, PM):
self.pm = s == PM
s = 0
else:
self.pm = None
self.h, self.m, self.s = h, m, s
def __class_getitem__(cls, arg):
return cls(arg)
def __getitem__(self, arg):
return(type(self)(arg))
def __repr__(self):
h, m, s = self.h, self.m, self.s or None
if m == 0:
m = f'OO'
elif m < 10:
m = f'O{m}'
s = '' if s is None else s
if self.pm is None:
pm = ''
else:
pm = ':' + ('AM', 'PM')[self.pm]
return f'T[{h}:{m}{s}{pm}]'
def __iter__(self):
yield from (self.h, self.m, self.s)
def __eq__(self, other):
return tuple(self) == tuple(other)
def __lt__(self, other):
return tuple(self) < tuple(other)
def __add__(self, other):
"""
>>> T[11:O5:AM] + 15 # TODO: preserve pm field
T[11:20]
"""
if isinstance(other, int):
return self[self.h:self.m + other:self.pm]