ch20: renamed directory

This commit is contained in:
Luciano Ramalho
2021-10-05 09:44:12 -03:00
parent 980f750326
commit 5d6b156047
19 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,31 @@
"""
Experiment with ``ThreadPoolExecutor.map``
"""
# tag::EXECUTOR_MAP[]
from time import sleep, strftime
from concurrent import futures
def display(*args): # <1>
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)
def loiter(n): # <2>
msg = '{}loiter({}): doing nothing for {}s...'
display(msg.format('\t'*n, n, n))
sleep(n)
msg = '{}loiter({}): done.'
display(msg.format('\t'*n, n))
return n * 10 # <3>
def main():
display('Script starting.')
executor = futures.ThreadPoolExecutor(max_workers=3) # <4>
results = executor.map(loiter, range(5)) # <5>
display('results:', results) # <6>
display('Waiting for individual results:')
for i, result in enumerate(results): # <7>
display(f'result {i}: {result}')
if __name__ == '__main__':
main()
# end::EXECUTOR_MAP[]

1
20-executors/getflags/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
flags/

View File

@@ -0,0 +1,122 @@
= Experimenting with the `flags2*` examples
The `flags2*` examples enhance the `flags*` examples with error handling and reporting.
Therefore, we need a server that generates errors and delays to experiment with them.
The main reason for these instructions is to document how to configure one such server
in your machine, and how to tell the `flags2*` clients to access it.
The other reason is to alert of an installation step that MacOS users sometimes overlook.
Contents:
* <<server_setup>>
* <<client_setup>>
* <<macos_certificates>>
[[server_setup]]
== Setting up test servers
If you don't already have a local HTTP server for testing,
here are the steps to experiment with the `flags2*` examples
using just the Python ≥ 3.9 distribution:
. Clone or download the https://github.com/fluentpython/example-code-2e[_Fluent Python 2e_ code repository] (this repo!).
. Open your shell and go to the _20-futures/getflags/_ directory of your local copy of the repository (this directory!)
. Unzip the _flags.zip_ file, creating a _flags_ directory at _20-futures/getflags/flags/_.
. Open a second shell, go to the _20-futures/getflags/_ directory and run `python3 -m http.server`. This will start a `ThreadingHTTPServer` listening to port 8000, serving the local files. If you open the URL http://localhost:8000/flags/[http://localhost:8000/flags/] with your browser, you'll see a long list of directories named with two-letter country codes from `ad/` to `zw/`.
. Now you can go back to the first shell and run the _flags2*.py_ examples with the default `--server LOCAL` option.
. To test with the `--server DELAY` option, go to _20-futures/getflags/_ and run `python3 slow_server.py`. This binds to port 8001 by default. It will add a random delay of .5s to 5s before each response.
. To test with the `--server ERROR` option, go to _20-futures/getflags/_ and run `python3 slow_server.py 8002 --error-rate .25`.
Each request will have a 25% probability of getting a
https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/418[418 I'm a teapot] response,
and all responses will be delayed .5s.
I wrote _slow_server.py_ reusing code from Python's
https://github.com/python/cpython/blob/917eca700aa341f8544ace43b75d41b477e98b72/Lib/http/server.py[`http.server`] standard library module,
which "is not recommended for production"—according to the
https://docs.python.org/3/library/http.server.html[documentation].
[NOTE]
====
This is a simple testing environment that does not require any external libraries or
tools—apart from the libraries used in the `flags2*` scripts themselves, as discussed in the book.
For a more robust testing environment, I recommend configuring
https://www.nginx.com/[NGINX] and
https://github.com/shopify/toxiproxy[Toxiproxy] with equivalent parameters.
====
[[client_setup]]
== Running a `flags2*` script
The `flags2*` examples provide a command-line interface.
All three scripts accept the same options,
and you can see them by running any of the scripts with the `-h` option:
[[flags2_help_demo]]
.Help screen for the scripts in the flags2 series
====
[source, text]
----
$ python3 flags2_threadpool.py -h
usage: flags2_threadpool.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
[-v]
[CC [CC ...]]
Download flags for country codes. Default: top 20 countries by population.
positional arguments:
CC country code or 1st letter (eg. B for BA...BZ)
optional arguments:
-h, --help show this help message and exit
-a, --all get all available flags (AD to ZW)
-e, --every get flags for every possible code (AA...ZZ)
-l N, --limit N limit to N first codes
-m CONCURRENT, --max_req CONCURRENT
maximum concurrent requests (default=30)
-s LABEL, --server LABEL
Server to hit; one of DELAY, ERROR, LOCAL, REMOTE
(default=LOCAL)
-v, --verbose output detailed progress info
----
====
All arguments are optional. The most important arguments are discussed next.
One option you can't ignore is `-s/--server`: it lets you choose which HTTP server and base URL will be used in the test.
You can pass one of four labels to determine where the script will look for the flags (the labels are case-insensitive):
`LOCAL`:: Use `http://localhost:8000/flags`; this is the default.
You should configure a local HTTP server to answer at port 8000. See <<server_setup>> for instructions.
Feel free to hit this as hard as you can. It's your machine!
`REMOTE`:: Use `http://fluentpython.com/data/flags`; that is a public website owned by me, hosted on a shared server.
Please do not hit it with too many concurrent requests.
The `fluentpython.com` domain is handled by the http://www.cloudflare.com/[Cloudflare] CDN (Content Delivery Network)
so you may notice that the first downloads are slower, but they get faster when the CDN cache warms
up.footnote:[Before configuring Cloudflare, I got HTTP 503 errors--Service Temporarily Unavailable--when
testing the scripts with a few dozen concurrent requests on my inexpensive shared host account. Now those errors are gone.]
`DELAY`:: Use `http://localhost:8001/flags`; a server delaying HTTP responses should be listening to port 8001.
I wrote _slow_server.py_ to make it easier to experiment. See <<server_setup>> for instructions.
`ERROR`:: Use `http://localhost:8002/flags`; a server introducing HTTP errors and delaying responses should be installed at port 8002.
Running _slow_server.py_ is an easy way to do it. See <<server_setup>>.
[[macos_certificates]]
== Install SSL Certificates (for MacOS)
On Macos, depending on how you installed Python you may need to manually run a command
after Python's installer finishes, to install the SSL certificates Python uses to make HTTPS connections.
Using the Finder, open the `Python 3.X` folder inside `/Applications` folder
and double-click "Install Certificates" or "Install Certificates.command".
Using the terminal, you can type for example:
[source, text]
----
$ open /Applications/Python 3.10/"Install Certificates.command"
----

View File

@@ -0,0 +1,8 @@
AD AE AF AG AL AM AO AR AT AU AZ BA BB BD BE BF BG BH BI BJ BN BO BR BS BT
BW BY BZ CA CD CF CG CH CI CL CM CN CO CR CU CV CY CZ DE DJ DK DM DZ EC EE
EG ER ES ET FI FJ FM FR GA GB GD GE GH GM GN GQ GR GT GW GY HN HR HT HU ID
IE IL IN IQ IR IS IT JM JO JP KE KG KH KI KM KN KP KR KW KZ LA LB LC LI LK
LR LS LT LU LV LY MA MC MD ME MG MH MK ML MM MN MR MT MU MV MW MX MY MZ NA
NE NG NI NL NO NP NR NZ OM PA PE PG PH PK PL PT PW PY QA RO RS RU RW SA SB
SC SD SE SG SI SK SL SM SN SO SR SS ST SV SY SZ TD TG TH TJ TL TM TN TO TR
TT TV TW TZ UA UG US UY UZ VA VC VE VN VU WS YE ZA ZM ZW

58
20-executors/getflags/flags.py Executable file
View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""Download flags of top 20 countries by population
Sequential version
Sample runs (first with new domain, so no caching ever)::
$ ./flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 downloads in 26.21s
$ ./flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 downloads in 14.57s
"""
# tag::FLAGS_PY[]
import time
from pathlib import Path
from typing import Callable
import httpx # <1>
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split() # <2>
BASE_URL = 'https://www.fluentpython.com/data/flags' # <3>
DEST_DIR = Path('downloaded') # <4>
def save_flag(img: bytes, filename: str) -> None: # <5>
(DEST_DIR / filename).write_bytes(img)
def get_flag(cc: str) -> bytes: # <6>
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = httpx.get(url, timeout=6.1, # <7>
follow_redirects=True) # <8>
resp.raise_for_status() # <9>
return resp.content
def download_many(cc_list: list[str]) -> int: # <10>
for cc in sorted(cc_list): # <11>
image = get_flag(cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True) # <12>
return len(cc_list)
def main(downloader: Callable[[list[str]], int]) -> None: # <13>
DEST_DIR.mkdir(exist_ok=True) # <14>
t0 = time.perf_counter() # <15>
count = downloader(POP20_CC)
elapsed = time.perf_counter() - t0
print(f'\n{count} downloads in {elapsed:.2f}s')
if __name__ == '__main__':
main(download_many) # <16>
# end::FLAGS_PY[]

Binary file not shown.

View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""Download flags of countries (with error handling).
asyncio async/await version
"""
# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path
import httpx
import tqdm # type: ignore
from flags2_common import main, DownloadStatus, save_flag
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
async def get_flag(session: httpx.AsyncClient, # <2>
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = await session.get(url, timeout=3.1, follow_redirects=True) # <3>
resp.raise_for_status()
return resp.content
async def download_one(session: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore, # <4>
verbose: bool) -> DownloadStatus:
try:
async with semaphore: # <5>
image = await get_flag(session, base_url, cc)
except httpx.HTTPStatusError as exc: # <4>
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND # <5>
msg = f'not found: {res.url}'
else:
raise
else:
await asyncio.to_thread(save_flag, image, f'{cc}.gif')
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:
print(cc, msg)
return status
# end::FLAGS2_ASYNCIO_TOP[]
# tag::FLAGS2_ASYNCIO_START[]
async def supervisor(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]: # <1>
counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) # <2>
async with httpx.AsyncClient() as session:
to_do = [download_one(session, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4>
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
error: httpx.HTTPError | None = None
for coro in to_do_iter: # <6>
try:
status = await coro # <7>
except httpx.HTTPStatusError as exc: # <8>
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
error = exc
except httpx.RequestError as exc: # <9>
error_msg = f'{exc} {type(exc)}'.strip()
error = exc
except KeyboardInterrupt: # <10>
break
else: # <11>
error = None
if error:
status = DownloadStatus.ERROR # <12>
if verbose:
url = str(error.request.url) # <13>
cc = Path(url).stem.upper() # <14>
print(f'{cc} error: {error_msg}')
counter[status] += 1
return counter
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14>
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]

View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""Download flags of countries (with error handling).
asyncio async/await version
"""
# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path
import httpx
import tqdm # type: ignore
from flags2_common import main, DownloadStatus, save_flag
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
async def get_flag(session: httpx.AsyncClient, # <2>
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = await session.get(url, timeout=3.1, follow_redirects=True) # <3>
resp.raise_for_status()
return resp.content
async def download_one(session: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus:
try:
async with semaphore:
image = await get_flag(session, base_url, cc)
except httpx.HTTPStatusError as exc:
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}'
else:
raise
else:
# tag::FLAGS2_ASYNCIO_EXECUTOR[]
loop = asyncio.get_running_loop() # <1>
loop.run_in_executor(None, save_flag, # <2>
image, f'{cc}.gif') # <3>
# end::FLAGS2_ASYNCIO_EXECUTOR[]
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:
print(cc, msg)
return status
async def supervisor(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]: # <1>
counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) # <2>
async with httpx.AsyncClient() as session:
to_do = [download_one(session, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4>
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
error: httpx.HTTPError | None = None
for coro in to_do_iter: # <6>
try:
status = await coro # <7>
except httpx.HTTPStatusError as exc: # <13>
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
error = exc
except httpx.RequestError as exc: # <15>
error_msg = f'{exc} {type(exc)}'.strip()
error = exc
except KeyboardInterrupt: # <7>
break
else: # <8>
error = None
if error:
status = DownloadStatus.ERROR # <9>
if verbose: # <11>
cc = Path(str(error.request.url)).stem.upper()
print(f'{cc} error: {error_msg}')
counter[status] += 1 # <10>
return counter # <12>
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14>
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]

View File

@@ -0,0 +1,155 @@
"""Utilities for second set of flag examples.
"""
import argparse
import string
import sys
import time
from collections import Counter
from enum import Enum
from pathlib import Path
DownloadStatus = Enum('DownloadStatus', 'OK NOT_FOUND ERROR')
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1
SERVERS = {
'REMOTE': 'https://www.fluentpython.com/data/flags',
'LOCAL': 'http://localhost:8000/flags',
'DELAY': 'http://localhost:8001/flags',
'ERROR': 'http://localhost:8002/flags',
}
DEFAULT_SERVER = 'LOCAL'
DEST_DIR = Path('downloaded')
COUNTRY_CODES_FILE = Path('country_codes.txt')
def save_flag(img: bytes, filename: str) -> None:
(DEST_DIR / filename).write_bytes(img)
def initial_report(cc_list: list[str],
actual_req: int,
server_label: str) -> None:
if len(cc_list) <= 10:
cc_msg = ', '.join(cc_list)
else:
cc_msg = f'from {cc_list[0]} to {cc_list[-1]}'
print(f'{server_label} site: {SERVERS[server_label]}')
plural = 's' if len(cc_list) != 1 else ''
print(f'Searching for {len(cc_list)} flag{plural}: {cc_msg}')
if actual_req == 1:
print('1 connection will be used.')
else:
print(f'{actual_req} concurrent connections will be used.')
def final_report(cc_list: list[str],
counter: Counter[DownloadStatus],
start_time: float) -> None:
elapsed = time.perf_counter() - start_time
print('-' * 20)
plural = 's' if counter[DownloadStatus.OK] != 1 else ''
print(f'{counter[DownloadStatus.OK]:3} flag{plural} downloaded.')
if counter[DownloadStatus.NOT_FOUND]:
print(f'{counter[DownloadStatus.NOT_FOUND]:3} not found.')
if counter[DownloadStatus.ERROR]:
plural = 's' if counter[DownloadStatus.ERROR] != 1 else ''
print(f'{counter[DownloadStatus.ERROR]:3} error{plural}.')
print(f'Elapsed time: {elapsed:.2f}s')
def expand_cc_args(every_cc: bool,
all_cc: bool,
cc_args: list[str],
limit: int) -> list[str]:
codes: set[str] = set()
A_Z = string.ascii_uppercase
if every_cc:
codes.update(a+b for a in A_Z for b in A_Z)
elif all_cc:
text = COUNTRY_CODES_FILE.read_text()
codes.update(text.split())
else:
for cc in (c.upper() for c in cc_args):
if len(cc) == 1 and cc in A_Z:
codes.update(cc + c for c in A_Z)
elif len(cc) == 2 and all(c in A_Z for c in cc):
codes.add(cc)
else:
raise ValueError('*** Usage error: each CC argument '
'must be A to Z or AA to ZZ.')
return sorted(codes)[:limit]
def process_args(default_concur_req):
server_options = ', '.join(sorted(SERVERS))
parser = argparse.ArgumentParser(
description='Download flags for country codes. '
'Default: top 20 countries by population.')
parser.add_argument(
'cc', metavar='CC', nargs='*',
help='country code or 1st letter (eg. B for BA...BZ)')
parser.add_argument(
'-a', '--all', action='store_true',
help='get all available flags (AD to ZW)')
parser.add_argument(
'-e', '--every', action='store_true',
help='get flags for every possible code (AA...ZZ)')
parser.add_argument(
'-l', '--limit', metavar='N', type=int, help='limit to N first codes',
default=sys.maxsize)
parser.add_argument(
'-m', '--max_req', metavar='CONCURRENT', type=int,
default=default_concur_req,
help=f'maximum concurrent requests (default={default_concur_req})')
parser.add_argument(
'-s', '--server', metavar='LABEL', default=DEFAULT_SERVER,
help=f'Server to hit; one of {server_options} '
f'(default={DEFAULT_SERVER})')
parser.add_argument(
'-v', '--verbose', action='store_true',
help='output detailed progress info')
args = parser.parse_args()
if args.max_req < 1:
print('*** Usage error: --max_req CONCURRENT must be >= 1')
parser.print_usage()
# "standard" exit status codes:
# https://stackoverflow.com/questions/1101957/are-there-any-standard-exit-status-codes-in-linux/40484670#40484670
sys.exit(2) # command line usage error
if args.limit < 1:
print('*** Usage error: --limit N must be >= 1')
parser.print_usage()
sys.exit(2) # command line usage error
args.server = args.server.upper()
if args.server not in SERVERS:
print(f'*** Usage error: --server LABEL '
f'must be one of {server_options}')
parser.print_usage()
sys.exit(2) # command line usage error
try:
cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
except ValueError as exc:
print(exc.args[0])
parser.print_usage()
sys.exit(2) # command line usage error
if not cc_list:
cc_list = sorted(POP20_CC)[:args.limit]
return args, cc_list
def main(download_many, default_concur_req, max_concur_req):
args, cc_list = process_args(default_concur_req)
actual_req = min(args.max_req, max_concur_req, len(cc_list))
initial_report(cc_list, actual_req, args.server)
base_url = SERVERS[args.server]
DEST_DIR.mkdir(exist_ok=True)
t0 = time.perf_counter()
counter = download_many(cc_list, base_url, args.verbose, actual_req)
final_report(cc_list, counter, t0)

View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""Download flags of countries (with error handling).
Sequential version
Sample run::
$ python3 flags2_sequential.py -s DELAY b
DELAY site: http://localhost:8002/flags
Searching for 26 flags: from BA to BZ
1 concurrent connection will be used.
--------------------
17 flags downloaded.
9 not found.
Elapsed time: 13.36s
"""
# tag::FLAGS2_BASIC_HTTP_FUNCTIONS[]
from collections import Counter
from http import HTTPStatus
import httpx
import tqdm # type: ignore # <1>
from flags2_common import main, save_flag, DownloadStatus # <2>
DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1
def get_flag(base_url: str, cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = httpx.get(url, timeout=3.1, follow_redirects=True)
resp.raise_for_status() # <3>
return resp.content
def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus:
try:
image = get_flag(base_url, cc)
except httpx.HTTPStatusError as exc: # <4>
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND # <5>
msg = f'not found: {res.url}'
else:
raise # <6>
else:
save_flag(image, f'{cc}.gif')
status = DownloadStatus.OK
msg = 'OK'
if verbose: # <7>
print(cc, msg)
return status
# end::FLAGS2_BASIC_HTTP_FUNCTIONS[]
# tag::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[]
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
_unused_concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[DownloadStatus] = Counter() # <1>
cc_iter = sorted(cc_list) # <2>
if not verbose:
cc_iter = tqdm.tqdm(cc_iter) # <3>
for cc in cc_iter:
try:
status = download_one(cc, base_url, verbose) # <4>
except httpx.HTTPStatusError as exc: # <5>
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
except httpx.RequestError as exc: # <6>
error_msg = f'{exc} {type(exc)}'.strip()
except KeyboardInterrupt: # <7>
break
else: # <8>
error_msg = ''
if error_msg:
status = DownloadStatus.ERROR # <9>
counter[status] += 1 # <10>
if verbose and error_msg: # <11>
print(f'{cc} error: {error_msg}')
return counter # <12>
# end::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[]
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""Download flags of countries (with error handling).
ThreadPool version
Sample run::
$ python3 flags2_threadpool.py -s ERROR -e
ERROR site: http://localhost:8003/flags
Searching for 676 flags: from AA to ZZ
30 concurrent connections will be used.
--------------------
150 flags downloaded.
361 not found.
165 errors.
Elapsed time: 7.46s
"""
# tag::FLAGS2_THREADPOOL[]
from collections import Counter
from concurrent import futures
import httpx
import tqdm # type: ignore
from flags2_common import main, DownloadStatus
from flags2_sequential import download_one # <1>
DEFAULT_CONCUR_REQ = 30 # <2>
MAX_CONCUR_REQ = 1000 # <3>
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[DownloadStatus] = Counter()
with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: # <4>
to_do_map = {} # <5>
for cc in sorted(cc_list): # <6>
future = executor.submit(download_one, cc,
base_url, verbose) # <7>
to_do_map[future] = cc # <8>
done_iter = futures.as_completed(to_do_map) # <9>
if not verbose:
done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <10>
for future in done_iter: # <11>
try:
status = future.result() # <12>
except httpx.HTTPStatusError as exc: # <13>
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
except httpx.RequestError as exc: # <15>
error_msg = f'{exc} {type(exc)}'.strip()
except KeyboardInterrupt:
break
else:
error_msg = ''
if error_msg:
status = DownloadStatus.ERROR
counter[status] += 1
if verbose and error_msg:
cc = to_do_map[future] # <16>
print(f'{cc} error: {error_msg}')
return counter
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_THREADPOOL[]

View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""Download flags of top 20 countries by population
asyncio + aiottp version
Sample run::
$ python3 flags_asyncio.py
EG VN IN TR RU ID US DE CN MX JP BD NG ET FR BR PH PK CD IR
20 flags downloaded in 1.07s
"""
# tag::FLAGS_ASYNCIO_TOP[]
import asyncio
from httpx import AsyncClient # <1>
from flags import BASE_URL, save_flag, main # <2>
async def download_one(session: AsyncClient, cc: str): # <3>
image = await get_flag(session, cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return cc
async def get_flag(session: AsyncClient, cc: str) -> bytes: # <4>
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = await session.get(url, timeout=6.1,
follow_redirects=True) # <5>
return resp.read() # <6>
# end::FLAGS_ASYNCIO_TOP[]
# tag::FLAGS_ASYNCIO_START[]
def download_many(cc_list: list[str]) -> int: # <1>
return asyncio.run(supervisor(cc_list)) # <2>
async def supervisor(cc_list: list[str]) -> int:
async with AsyncClient() as session: # <3>
to_do = [download_one(session, cc)
for cc in sorted(cc_list)] # <4>
res = await asyncio.gather(*to_do) # <5>
return len(res) # <6>
if __name__ == '__main__':
main(download_many)
# end::FLAGS_ASYNCIO_START[]

View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python3
"""Download flags of top 20 countries by population
ThreadPoolExecutor version
Sample run::
$ python3 flags_threadpool.py
DE FR BD CN EG RU IN TR VN ID JP BR NG MX PK ET PH CD US IR
20 downloads in 0.35s
"""
# tag::FLAGS_THREADPOOL[]
from concurrent import futures
from flags import save_flag, get_flag, main # <1>
def download_one(cc: str): # <2>
image = get_flag(cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return cc
def download_many(cc_list: list[str]) -> int:
with futures.ThreadPoolExecutor() as executor: # <3>
res = executor.map(download_one, sorted(cc_list)) # <4>
return len(list(res)) # <5>
if __name__ == '__main__':
main(download_many) # <6>
# end::FLAGS_THREADPOOL[]

View File

@@ -0,0 +1,31 @@
#!/usr/bin/env python3
"""Download flags of top 20 countries by population
ThreadPoolExecutor example with ``as_completed``.
"""
from concurrent import futures
from flags import main
from flags_threadpool import download_one
# tag::FLAGS_THREADPOOL_AS_COMPLETED[]
def download_many(cc_list: list[str]) -> int:
cc_list = cc_list[:5] # <1>
with futures.ThreadPoolExecutor(max_workers=3) as executor: # <2>
to_do: list[futures.Future] = []
for cc in sorted(cc_list): # <3>
future = executor.submit(download_one, cc) # <4>
to_do.append(future) # <5>
print(f'Scheduled for {cc}: {future}') # <6>
for count, future in enumerate(futures.as_completed(to_do), 1): # <7>
res: str = future.result() # <8>
print(f'{future} result: {res!r}') # <9>
return count
# end::FLAGS_THREADPOOL_AS_COMPLETED[]
if __name__ == '__main__':
main(download_many)

View File

@@ -0,0 +1,10 @@
anyio==3.3.2
certifi==2021.5.30
charset-normalizer==2.0.6
h11==0.12.0
httpcore==0.13.7
httpx==1.0.0b0
idna==3.2
rfc3986==1.5.0
sniffio==1.2.0
tqdm==4.62.3

View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""Slow HTTP server class.
This module implements a ThreadingHTTPServer using a custom
SimpleHTTPRequestHandler subclass that introduces delays to all
GET responses, and optionally returns errors to a fraction of
the requests if given the --error_rate command-line argument.
"""
import contextlib
import os
import socket
import time
from functools import partial
from http import server, HTTPStatus
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
from random import random, uniform
MIN_DELAY = 0.5 # minimum delay for do_GET (seconds)
MAX_DELAY = 5.0 # maximum delay for do_GET (seconds)
class SlowHTTPRequestHandler(SimpleHTTPRequestHandler):
"""SlowHTTPRequestHandler adds delays and errors to test HTTP clients.
The optional error_rate argument determines how often GET requests
receive a 418 status code, "I'm a teapot".
If error_rate is .15, there's a 15% probability of each GET request
getting that error.
When the server believes it is a teapot, it refuses requests to serve files.
See: https://tools.ietf.org/html/rfc2324#section-2.3.2
"""
def __init__(self, *args, error_rate=0.0, **kwargs):
self.error_rate = error_rate
super().__init__(*args, **kwargs)
def do_GET(self):
"""Serve a GET request."""
delay = uniform(MIN_DELAY, MAX_DELAY)
cc = self.path[-6:-4].upper()
print(f'{cc} delay: {delay:0.2}s')
time.sleep(delay)
if random() < self.error_rate:
# HTTPStatus.IM_A_TEAPOT requires Python >= 3.9
try:
self.send_error(HTTPStatus.IM_A_TEAPOT, "I'm a Teapot")
except BrokenPipeError as exc:
print(f'{cc} *** BrokenPipeError: client closed')
else:
f = self.send_head()
if f:
try:
self.copyfile(f, self.wfile)
except BrokenPipeError as exc:
print(f'{cc} *** BrokenPipeError: client closed')
finally:
f.close()
# The code in the `if` block below, including comments, was copied
# and adapted from the `http.server` module of Python 3.9
# https://github.com/python/cpython/blob/master/Lib/http/server.py
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--bind', '-b', metavar='ADDRESS',
help='Specify alternate bind address '
'[default: all interfaces]')
parser.add_argument('--directory', '-d', default=os.getcwd(),
help='Specify alternative directory '
'[default:current directory]')
parser.add_argument('--error-rate', '-e', metavar='PROBABILITY',
default=0.0, type=float,
help='Error rate; e.g. use .25 for 25%% probability '
'[default:0.0]')
parser.add_argument('port', action='store',
default=8001, type=int,
nargs='?',
help='Specify alternate port [default: 8001]')
args = parser.parse_args()
handler_class = partial(SlowHTTPRequestHandler,
directory=args.directory,
error_rate=args.error_rate)
# ensure dual-stack is not disabled; ref #38907
class DualStackServer(ThreadingHTTPServer):
def server_bind(self):
# suppress exception when protocol is IPv4
with contextlib.suppress(Exception):
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
return super().server_bind()
# test is a top-level function in http.server omitted from __all__
server.test( # type: ignore
HandlerClass=handler_class,
ServerClass=DualStackServer,
port=args.port,
bind=args.bind,
)

View File

@@ -0,0 +1,38 @@
import httpx
def tree(cls, level=0):
yield cls.__name__, level
for sub_cls in cls.__subclasses__():
yield from tree(sub_cls, level+1)
def display(cls):
for cls_name, level in tree(cls):
indent = ' ' * 4 * level
print(f'{indent}{cls_name}')
def find_roots(module):
exceptions = []
for name in dir(module):
obj = getattr(module, name)
if isinstance(obj, type) and issubclass(obj, BaseException):
exceptions.append(obj)
roots = []
for exc in exceptions:
root = True
for other in exceptions:
if exc is not other and issubclass(exc, other):
root = False
break
if root:
roots.append(exc)
return roots
def main():
for exc in find_roots(httpx):
display(exc)
if __name__ == '__main__':
main()

52
20-executors/primes/primes.py Executable file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
import math
PRIME_FIXTURE = [
(2, True),
(142702110479723, True),
(299593572317531, True),
(3333333333333301, True),
(3333333333333333, False),
(3333335652092209, False),
(4444444444444423, True),
(4444444444444444, False),
(4444444488888889, False),
(5555553133149889, False),
(5555555555555503, True),
(5555555555555555, False),
(6666666666666666, False),
(6666666666666719, True),
(6666667141414921, False),
(7777777536340681, False),
(7777777777777753, True),
(7777777777777777, False),
(9999999999999917, True),
(9999999999999999, False),
]
NUMBERS = [n for n, _ in PRIME_FIXTURE]
# tag::IS_PRIME[]
def is_prime(n: int) -> bool:
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
root = math.isqrt(n)
for i in range(3, root + 1, 2):
if n % i == 0:
return False
return True
# end::IS_PRIME[]
if __name__ == '__main__':
for n, prime in PRIME_FIXTURE:
prime_res = is_prime(n)
assert prime_res == prime
print(n, prime)

View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""
proc_pool.py: a version of the proc.py example from chapter 20,
but using `concurrent.futures.ProcessPoolExecutor`.
"""
# tag::PRIMES_POOL[]
import sys
from concurrent import futures # <1>
from time import perf_counter
from typing import NamedTuple
from primes import is_prime, NUMBERS
class PrimeResult(NamedTuple): # <2>
n: int
flag: bool
elapsed: float
def check(n: int) -> PrimeResult:
t0 = perf_counter()
res = is_prime(n)
return PrimeResult(n, res, perf_counter() - t0)
def main() -> None:
if len(sys.argv) < 2:
workers = None # <3>
else:
workers = int(sys.argv[1])
executor = futures.ProcessPoolExecutor(workers) # <4>
actual_workers = executor._max_workers # type: ignore # <5>
print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')
t0 = perf_counter()
numbers = sorted(NUMBERS, reverse=True) # <6>
with executor: # <7>
for n, prime, elapsed in executor.map(check, numbers): # <8>
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s')
time = perf_counter() - t0
print(f'Total time: {time:.2f}s')
if __name__ == '__main__':
main()
# end::PRIMES_POOL[]