Merge branch 'master' of github.com:fluentpython/example-code-2e

This commit is contained in:
Luciano Ramalho
2021-10-18 14:44:13 -03:00
29 changed files with 149 additions and 71 deletions

View File

@@ -147,7 +147,7 @@ def test_factorial():
gcd_src = """ gcd_src = """
(define (mod m n) (define (mod m n)
(- m (* n (// m n)))) (- m (* n (quotient m n))))
(define (gcd m n) (define (gcd m n)
(if (= n 0) (if (= n 0)
m m

View File

@@ -80,7 +80,7 @@ def standard_env() -> Environment:
'-': op.sub, '-': op.sub,
'*': op.mul, '*': op.mul,
'/': op.truediv, '/': op.truediv,
'//': op.floordiv, 'quotient': op.floordiv,
'>': op.gt, '>': op.gt,
'<': op.lt, '<': op.lt,
'>=': op.ge, '>=': op.ge,

View File

@@ -146,7 +146,7 @@ def test_factorial():
gcd_src = """ gcd_src = """
(define (mod m n) (define (mod m n)
(- m (* n (// m n)))) (- m (* n (quotient m n))))
(define (gcd m n) (define (gcd m n)
(if (= n 0) (if (= n 0)
m m

View File

@@ -83,7 +83,7 @@ def standard_env() -> Environment:
'-': op.sub, '-': op.sub,
'*': op.mul, '*': op.mul,
'/': op.truediv, '/': op.truediv,
'//': op.floordiv, 'quotient': op.floordiv,
'>': op.gt, '>': op.gt,
'<': op.lt, '<': op.lt,
'>=': op.ge, '>=': op.ge,

View File

@@ -147,7 +147,7 @@ def test_factorial():
gcd_src = """ gcd_src = """
(define (mod m n) (define (mod m n)
(- m (* n (// m n)))) (- m (* n (quotient m n))))
(define (gcd m n) (define (gcd m n)
(if (= n 0) (if (= n 0)
m m
@@ -255,4 +255,4 @@ closure_averager_src = """
def test_closure_averager(): def test_closure_averager():
got = run(closure_averager_src) got = run(closure_averager_src)
assert got == 12.0 assert got == 12.0
# end::RUN_AVERAGER[] # end::RUN_AVERAGER[]

View File

@@ -80,7 +80,7 @@ def standard_env() -> Environment:
'-': op.sub, '-': op.sub,
'*': op.mul, '*': op.mul,
'/': op.truediv, '/': op.truediv,
'//': op.floordiv, 'quotient': op.floordiv,
'>': op.gt, '>': op.gt,
'<': op.lt, '<': op.lt,
'>=': op.ge, '>=': op.ge,

View File

@@ -31,46 +31,46 @@ def worker(jobs: JobQueue, results: ResultQueue) -> None: # <7>
while n := jobs.get(): # <8> while n := jobs.get(): # <8>
results.put(check(n)) # <9> results.put(check(n)) # <9>
results.put(PrimeResult(0, False, 0.0)) # <10> results.put(PrimeResult(0, False, 0.0)) # <10>
# end::PRIMES_PROC_TOP[]
# tag::PRIMES_PROC_MIDDLE[] def start_jobs(
def start_jobs(workers: int, jobs: JobQueue, results: ResultQueue) -> None: procs: int, jobs: JobQueue, results: ResultQueue # <11>
) -> None:
for n in NUMBERS: for n in NUMBERS:
jobs.put(n) # <1> jobs.put(n) # <12>
for _ in range(workers): for _ in range(procs):
proc = Process(target=worker, args=(jobs, results)) # <2> proc = Process(target=worker, args=(jobs, results)) # <13>
proc.start() # <3> proc.start() # <14>
jobs.put(0) # <4> jobs.put(0) # <15>
# end::PRIMES_PROC_TOP[]
def report(workers: int, results: ResultQueue) -> int:
checked = 0
workers_done = 0
while workers_done < workers:
n, prime, elapsed = results.get()
if n == 0:
workers_done += 1
else:
checked += 1
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s')
return checked
# end::PRIMES_PROC_MIDDLE[]
# tag::PRIMES_PROC_MAIN[] # tag::PRIMES_PROC_MAIN[]
def main() -> None: def main() -> None:
if len(sys.argv) < 2: if len(sys.argv) < 2: # <1>
workers = cpu_count() procs = cpu_count()
else: else:
workers = int(sys.argv[1]) procs = int(sys.argv[1])
print(f'Checking {len(NUMBERS)} numbers with {workers} processes:') print(f'Checking {len(NUMBERS)} numbers with {procs} processes:')
t0 = perf_counter() t0 = perf_counter()
jobs: JobQueue = SimpleQueue() jobs: JobQueue = SimpleQueue() # <2>
results: ResultQueue = SimpleQueue() results: ResultQueue = SimpleQueue()
start_jobs(workers, jobs, results) start_jobs(procs, jobs, results) # <3>
checked = report(workers, results) checked = report(procs, results) # <4>
elapsed = perf_counter() - t0 elapsed = perf_counter() - t0
print(f'{checked} checks in {elapsed:.2f}s') print(f'{checked} checks in {elapsed:.2f}s') # <5>
def report(procs: int, results: ResultQueue) -> int: # <6>
checked = 0
procs_done = 0
while procs_done < procs: # <7>
n, prime, elapsed = results.get() # <8>
if n == 0: # <9>
procs_done += 1
else:
checked += 1 # <10>
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s')
return checked
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
procs.py: shows that multiprocessing on a multicore machine
can be faster than sequential code for CPU-intensive work.
"""
# tag::PRIMES_PROC_TOP[]
import sys
from time import perf_counter
from typing import NamedTuple
from multiprocessing import Process, SimpleQueue, cpu_count # <1>
from multiprocessing import queues # <2>
from primes import is_prime, NUMBERS
class PrimeResult(NamedTuple): # <3>
n: int
prime: bool
elapsed: float
JobQueue = queues.SimpleQueue[int] # <4>
ResultQueue = queues.SimpleQueue[PrimeResult] # <5>
def check(n: int) -> PrimeResult: # <6>
t0 = perf_counter()
res = is_prime(n)
return PrimeResult(n, res, perf_counter() - t0)
def worker(jobs: JobQueue, results: ResultQueue) -> None: # <7>
while n := jobs.get(): # <8>
results.put(check(n)) # <9>
results.put(PrimeResult(0, False, 0.0))
# end::PRIMES_PROC_TOP[]
def start_jobs(workers: int) -> ResultQueue:
jobs: JobQueue = SimpleQueue() # <2>
results: ResultQueue = SimpleQueue()
for n in NUMBERS: # <3>
jobs.put(n)
for _ in range(workers):
proc = Process(target=worker, args=(jobs, results)) # <4>
proc.start() # <5>
jobs.put(0) # <6>
return results
def report(workers: int, results: ResultQueue) -> int:
workers_done = 0
checked = 0
while workers_done < workers:
n, prime, elapsed = results.get() # <7>
if n == 0:
workers_done += 1
else:
checked += 1
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s') # <8>
return checked
# tag::PRIMES_PROC_MAIN[]
def main() -> None:
if len(sys.argv) < 2: # <1>
workers = cpu_count()
else:
workers = int(sys.argv[1])
print(f'Checking {len(NUMBERS)} numbers with {workers} processes:')
t0 = perf_counter()
results = start_jobs(workers)
checked = report(workers, results)
elapsed = perf_counter() - t0
print(f'{checked} checks in {elapsed:.2f}s')
if __name__ == '__main__':
main()
# end::PRIMES_PROC_MAIN[]

2
20-executors/getflags/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
flags/
downloaded/

View File

@@ -16,37 +16,36 @@ import tqdm # type: ignore
from flags2_common import main, DownloadStatus, save_flag from flags2_common import main, DownloadStatus, save_flag
# default set low to avoid errors from remote site, such as # low concurrency default to avoid errors from remote site,
# 503 - Service Temporarily Unavailable # such as 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5 DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000 MAX_CONCUR_REQ = 1000
async def get_flag(session: httpx.AsyncClient, # <2> async def get_flag(client: httpx.AsyncClient, # <1>
base_url: str, base_url: str,
cc: str) -> bytes: cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower() url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = await session.get(url, timeout=3.1, follow_redirects=True) # <3> resp = await client.get(url, timeout=3.1, follow_redirects=True) # <2>
resp.raise_for_status() resp.raise_for_status()
return resp.content return resp.content
async def download_one(session: httpx.AsyncClient, async def download_one(client: httpx.AsyncClient,
cc: str, cc: str,
base_url: str, base_url: str,
semaphore: asyncio.Semaphore, # <4> semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus: verbose: bool) -> DownloadStatus:
try: try:
async with semaphore: # <5> async with semaphore: # <3>
image = await get_flag(session, base_url, cc) image = await get_flag(client, base_url, cc)
except httpx.HTTPStatusError as exc: # <4> except httpx.HTTPStatusError as exc: # <5>
res = exc.response res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND: if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND # <5> status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}' msg = f'not found: {res.url}'
else: else:
raise raise
else: else:
await asyncio.to_thread(save_flag, image, f'{cc}.gif') await asyncio.to_thread(save_flag, image, f'{cc}.gif') # <6>
status = DownloadStatus.OK status = DownloadStatus.OK
msg = 'OK' msg = 'OK'
if verbose and msg: if verbose and msg:
@@ -61,33 +60,31 @@ async def supervisor(cc_list: list[str],
concur_req: int) -> Counter[DownloadStatus]: # <1> concur_req: int) -> Counter[DownloadStatus]: # <1>
counter: Counter[DownloadStatus] = Counter() counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) # <2> semaphore = asyncio.Semaphore(concur_req) # <2>
async with httpx.AsyncClient() as session: async with httpx.AsyncClient() as client:
to_do = [download_one(session, cc, base_url, semaphore, verbose) to_do = [download_one(client, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # <3> for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4> to_do_iter = asyncio.as_completed(to_do) # <4>
if not verbose: if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5> to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
error: httpx.HTTPError | None = None error: httpx.HTTPError | None = None # <6>
for coro in to_do_iter: # <6> for coro in to_do_iter: # <7>
try: try:
status = await coro # <7> status = await coro # <8>
except httpx.HTTPStatusError as exc: # <8> except httpx.HTTPStatusError as exc:
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}' error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response) error_msg = error_msg.format(resp=exc.response)
error = exc error = exc # <9>
except httpx.RequestError as exc: # <9> except httpx.RequestError as exc:
error_msg = f'{exc} {type(exc)}'.strip() error_msg = f'{exc} {type(exc)}'.strip()
error = exc error = exc # <10>
except KeyboardInterrupt: # <10> except KeyboardInterrupt:
break break
else: # <11>
error = None
if error: if error:
status = DownloadStatus.ERROR # <12> status = DownloadStatus.ERROR # <11>
if verbose: if verbose:
url = str(error.request.url) # <13> url = str(error.request.url) # <12>
cc = Path(url).stem.upper() # <14> cc = Path(url).stem.upper() # <13>
print(f'{cc} error: {error_msg}') print(f'{cc} error: {error_msg}')
counter[status] += 1 counter[status] += 1

View File

@@ -20,7 +20,7 @@ Sample run::
# tag::FLAGS2_THREADPOOL[] # tag::FLAGS2_THREADPOOL[]
from collections import Counter from collections import Counter
from concurrent import futures from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx import httpx
import tqdm # type: ignore import tqdm # type: ignore
@@ -37,13 +37,13 @@ def download_many(cc_list: list[str],
verbose: bool, verbose: bool,
concur_req: int) -> Counter[DownloadStatus]: concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[DownloadStatus] = Counter() counter: Counter[DownloadStatus] = Counter()
with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: # <4> with ThreadPoolExecutor(max_workers=concur_req) as executor: # <4>
to_do_map = {} # <5> to_do_map = {} # <5>
for cc in sorted(cc_list): # <6> for cc in sorted(cc_list): # <6>
future = executor.submit(download_one, cc, future = executor.submit(download_one, cc,
base_url, verbose) # <7> base_url, verbose) # <7>
to_do_map[future] = cc # <8> to_do_map[future] = cc # <8>
done_iter = futures.as_completed(to_do_map) # <9> done_iter = as_completed(to_do_map) # <9>
if not verbose: if not verbose:
done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <10> done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <10>
for future in done_iter: # <11> for future in done_iter: # <11>
@@ -52,7 +52,7 @@ def download_many(cc_list: list[str],
except httpx.HTTPStatusError as exc: # <13> except httpx.HTTPStatusError as exc: # <13>
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}' error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response) error_msg = error_msg.format(resp=exc.response)
except httpx.RequestError as exc: # <15> except httpx.RequestError as exc:
error_msg = f'{exc} {type(exc)}'.strip() error_msg = f'{exc} {type(exc)}'.strip()
except KeyboardInterrupt: except KeyboardInterrupt:
break break
@@ -63,7 +63,7 @@ def download_many(cc_list: list[str],
status = DownloadStatus.ERROR status = DownloadStatus.ERROR
counter[status] += 1 counter[status] += 1
if verbose and error_msg: if verbose and error_msg:
cc = to_do_map[future] # <16> cc = to_do_map[future] # <14>
print(f'{cc} error: {error_msg}') print(f'{cc} error: {error_msg}')
return counter return counter

View File

@@ -1 +0,0 @@
flags/

View File

@@ -44,7 +44,7 @@ Part / Chapter #|Title|Directory|1<sup>st</sup> ed. Chapter&nbsp;#
17|Iterators, Generators, and Classic Coroutines|[17-it-generator](17-it-generator)|14 17|Iterators, Generators, and Classic Coroutines|[17-it-generator](17-it-generator)|14
18|Context Managers and else Blocks|[18-with-match](18-with-match)|15 18|Context Managers and else Blocks|[18-with-match](18-with-match)|15
19|Concurrency Models in Python|[19-concurrency](19-concurrency)|🆕 19|Concurrency Models in Python|[19-concurrency](19-concurrency)|🆕
20|Concurrency with Futures|[20-futures](20-futures)|17 20|Concurrent Executors|[20-executors](20-executors)|17
21|Asynchronous Programming|[21-async](21-async)|18 21|Asynchronous Programming|[21-async](21-async)|18
**VI Metaprogramming**| **VI Metaprogramming**|
22|Dynamic Attributes and Properties|[22-dyn-attr-prop](22-dyn-attr-prop)|19 22|Dynamic Attributes and Properties|[22-dyn-attr-prop](22-dyn-attr-prop)|19