sequential, threaded & async HTTP clients using HTTPX

This commit is contained in:
Luciano Ramalho
2021-10-02 17:12:42 -03:00
parent 7985fda09f
commit 4f1392d21c
12 changed files with 271 additions and 224 deletions

View File

@@ -14,7 +14,7 @@ Contents:
* <<macos_certificates>> * <<macos_certificates>>
[[server_setup]] [[server_setup]]
== Setting up a test server == Setting up test servers
If you don't already have a local HTTP server for testing, If you don't already have a local HTTP server for testing,
here are the steps to experiment with the `flags2*` examples here are the steps to experiment with the `flags2*` examples
@@ -25,7 +25,7 @@ using just the Python ≥ 3.9 distribution:
. Unzip the _flags.zip_ file, creating a _flags_ directory at _20-futures/getflags/flags/_. . Unzip the _flags.zip_ file, creating a _flags_ directory at _20-futures/getflags/flags/_.
. Open a second shell, go to the _20-futures/getflags/_ directory and run `python3 -m http.server`. This will start a `ThreadingHTTPServer` listening to port 8000, serving the local files. If you open the URL http://localhost:8000/flags/[http://localhost:8000/flags/] with your browser, you'll see a long list of directories named with two-letter country codes from `ad/` to `zw/`. . Open a second shell, go to the _20-futures/getflags/_ directory and run `python3 -m http.server`. This will start a `ThreadingHTTPServer` listening to port 8000, serving the local files. If you open the URL http://localhost:8000/flags/[http://localhost:8000/flags/] with your browser, you'll see a long list of directories named with two-letter country codes from `ad/` to `zw/`.
. Now you can go back to the first shell and run the _flags2*.py_ examples with the default `--server LOCAL` option. . Now you can go back to the first shell and run the _flags2*.py_ examples with the default `--server LOCAL` option.
. To test with the `--server DELAY` option, go to _20-futures/getflags/_ and run `python3 slow_server.py`. This binds to port 8001 by default. It will add a .5s delay before each response. . To test with the `--server DELAY` option, go to _20-futures/getflags/_ and run `python3 slow_server.py`. This binds to port 8001 by default. It will add a random delay of .5s to 5s before each response.
. To test with the `--server ERROR` option, go to _20-futures/getflags/_ and run `python3 slow_server.py 8002 --error-rate .25`. . To test with the `--server ERROR` option, go to _20-futures/getflags/_ and run `python3 slow_server.py 8002 --error-rate .25`.
Each request will have a 25% probability of getting a Each request will have a 25% probability of getting a
https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/418[418 I'm a teapot] response, https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/418[418 I'm a teapot] response,
@@ -86,7 +86,7 @@ optional arguments:
All arguments are optional. The most important arguments are discussed next. All arguments are optional. The most important arguments are discussed next.
One option you can't ignore is `-s/--server`: it lets you choose which HTTP server and base URL will be used in the test. One option you can't ignore is `-s/--server`: it lets you choose which HTTP server and base URL will be used in the test.
You can pass one of four strings to determine where the script will look for the flags (the strings are case insensitive): You can pass one of four labels to determine where the script will look for the flags (the labels are case-insensitive):
`LOCAL`:: Use `http://localhost:8000/flags`; this is the default. `LOCAL`:: Use `http://localhost:8000/flags`; this is the default.
You should configure a local HTTP server to answer at port 8000. See <<server_setup>> for instructions. You should configure a local HTTP server to answer at port 8000. See <<server_setup>> for instructions.

View File

@@ -21,12 +21,12 @@ import time
from pathlib import Path from pathlib import Path
from typing import Callable from typing import Callable
import requests # <1> import httpx # <1>
POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split() # <2> 'MX PH VN ET EG DE IR TR CD FR').split() # <2>
BASE_URL = 'http://fluentpython.com/data/flags' # <3> BASE_URL = 'https://www.fluentpython.com/data/flags' # <3>
DEST_DIR = Path('downloaded') # <4> DEST_DIR = Path('downloaded') # <4>
def save_flag(img: bytes, filename: str) -> None: # <5> def save_flag(img: bytes, filename: str) -> None: # <5>
@@ -34,22 +34,25 @@ def save_flag(img: bytes, filename: str) -> None: # <5>
def get_flag(cc: str) -> bytes: # <6> def get_flag(cc: str) -> bytes: # <6>
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower() url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = requests.get(url) resp = httpx.get(url, timeout=6.1, # <7>
follow_redirects=True) # <8>
resp.raise_for_status() # <9>
return resp.content return resp.content
def download_many(cc_list: list[str]) -> int: # <7> def download_many(cc_list: list[str]) -> int: # <10>
for cc in sorted(cc_list): # <8> for cc in sorted(cc_list): # <11>
image = get_flag(cc) image = get_flag(cc)
save_flag(image, f'{cc}.gif') save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True) # <9> print(cc, end=' ', flush=True) # <12>
return len(cc_list) return len(cc_list)
def main(downloader: Callable[[list[str]], int]) -> None: # <10> def main(downloader: Callable[[list[str]], int]) -> None: # <13>
t0 = time.perf_counter() # <11> DEST_DIR.mkdir(exist_ok=True) # <14>
t0 = time.perf_counter() # <15>
count = downloader(POP20_CC) count = downloader(POP20_CC)
elapsed = time.perf_counter() - t0 elapsed = time.perf_counter() - t0
print(f'\n{count} downloads in {elapsed:.2f}s') print(f'\n{count} downloads in {elapsed:.2f}s')
if __name__ == '__main__': if __name__ == '__main__':
main(download_many) # <12> main(download_many) # <16>
# end::FLAGS_PY[] # end::FLAGS_PY[]

View File

@@ -8,65 +8,60 @@ asyncio async/await version
# tag::FLAGS2_ASYNCIO_TOP[] # tag::FLAGS2_ASYNCIO_TOP[]
import asyncio import asyncio
from collections import Counter from collections import Counter
from http import HTTPStatus
from pathlib import Path
import aiohttp import httpx
import tqdm # type: ignore import tqdm # type: ignore
from flags2_common import main, HTTPStatus, Result, save_flag from flags2_common import main, DownloadStatus, save_flag
# default set low to avoid errors from remote site, such as # default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable # 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5 DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000 MAX_CONCUR_REQ = 1000
async def get_flag(session: httpx.AsyncClient, # <2>
class FetchError(Exception): # <1>
def __init__(self, country_code: str):
self.country_code = country_code
async def get_flag(session: aiohttp.ClientSession, # <2>
base_url: str, base_url: str,
cc: str) -> bytes: cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower() url = f'{base_url}/{cc}/{cc}.gif'.lower()
async with session.get(url) as resp: resp = await session.get(url, timeout=3.1, follow_redirects=True) # <3>
if resp.status == 200: resp.raise_for_status()
return await resp.read() return resp.content
else:
resp.raise_for_status() # <3>
return bytes()
async def download_one(session: aiohttp.ClientSession, async def download_one(session: httpx.AsyncClient,
cc: str, cc: str,
base_url: str, base_url: str,
semaphore: asyncio.Semaphore, # <4> semaphore: asyncio.Semaphore, # <4>
verbose: bool) -> Result: verbose: bool) -> DownloadStatus:
try: try:
async with semaphore: # <5> async with semaphore: # <5>
image = await get_flag(session, base_url, cc) image = await get_flag(session, base_url, cc)
except aiohttp.ClientResponseError as exc: except httpx.HTTPStatusError as exc: # <4>
if exc.status == 404: # <6> res = exc.response
status = HTTPStatus.not_found if res.status_code == HTTPStatus.NOT_FOUND:
msg = 'not found' status = DownloadStatus.NOT_FOUND # <5>
msg = f'not found: {res.url}'
else: else:
raise FetchError(cc) from exc # <7> raise
else: else:
save_flag(image, f'{cc}.gif') await asyncio.to_thread(save_flag, image, f'{cc}.gif')
status = HTTPStatus.ok status = DownloadStatus.OK
msg = 'OK' msg = 'OK'
if verbose and msg: if verbose and msg:
print(cc, msg) print(cc, msg)
return Result(status, cc) return status
# end::FLAGS2_ASYNCIO_TOP[] # end::FLAGS2_ASYNCIO_TOP[]
# tag::FLAGS2_ASYNCIO_START[] # tag::FLAGS2_ASYNCIO_START[]
async def supervisor(cc_list: list[str], async def supervisor(cc_list: list[str],
base_url: str, base_url: str,
verbose: bool, verbose: bool,
concur_req: int) -> Counter[HTTPStatus]: # <1> concur_req: int) -> Counter[DownloadStatus]: # <1>
counter: Counter[HTTPStatus] = Counter() counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) # <2> semaphore = asyncio.Semaphore(concur_req) # <2>
async with aiohttp.ClientSession() as session: async with httpx.AsyncClient() as session:
to_do = [download_one(session, cc, base_url, semaphore, verbose) to_do = [download_one(session, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # <3> for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4> to_do_iter = asyncio.as_completed(to_do) # <4>
@@ -74,25 +69,33 @@ async def supervisor(cc_list: list[str],
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5> to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
for coro in to_do_iter: # <6> for coro in to_do_iter: # <6>
try: try:
res = await coro # <7> status = await coro # <7>
except FetchError as exc: # <8> except httpx.HTTPStatusError as exc: # <8>
country_code = exc.country_code # <9> error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
try: error_msg = error_msg.format(resp=exc.response)
error_msg = exc.__cause__.message # type: ignore # <10> error = exc
except AttributeError: except httpx.RequestError as exc: # <9>
error_msg = 'Unknown cause' # <11> error_msg = f'{exc} {type(exc)}'.strip()
if verbose and error_msg: error = exc
print(f'*** Error for {country_code}: {error_msg}') except KeyboardInterrupt: # <10>
status = HTTPStatus.error break
else: else: # <11>
status = res.status error = None
counter[status] += 1 # <12>
return counter # <13> if error:
status = DownloadStatus.ERROR # <12>
if verbose:
url = str(error.request.url) # <13>
cc = Path(url).stem.upper() # <14>
print(f'{cc} error: {error_msg}')
counter[status] += 1
return counter
def download_many(cc_list: list[str], def download_many(cc_list: list[str],
base_url: str, base_url: str,
verbose: bool, verbose: bool,
concur_req: int) -> Counter[HTTPStatus]: concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req) coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14> counts = asyncio.run(coro) # <14>

View File

@@ -2,17 +2,19 @@
"""Download flags of countries (with error handling). """Download flags of countries (with error handling).
asyncio async/await version using run_in_executor for save_flag. asyncio async/await version
""" """
# tag::FLAGS2_ASYNCIO_TOP[]
import asyncio import asyncio
from collections import Counter from collections import Counter
from http import HTTPStatus
from pathlib import Path
import aiohttp import httpx
import tqdm # type: ignore import tqdm # type: ignore
from flags2_common import main, HTTPStatus, Result, save_flag from flags2_common import main, DownloadStatus, save_flag
# default set low to avoid errors from remote site, such as # default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable # 503 - Service Temporarily Unavailable
@@ -20,90 +22,87 @@ DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000 MAX_CONCUR_REQ = 1000
class FetchError(Exception): async def get_flag(session: httpx.AsyncClient, # <2>
def __init__(self, country_code: str):
self.country_code = country_code
async def get_flag(session: aiohttp.ClientSession,
base_url: str, base_url: str,
cc: str) -> bytes: cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower() url = f'{base_url}/{cc}/{cc}.gif'.lower()
async with session.get(url) as resp: resp = await session.get(url, timeout=3.1, follow_redirects=True) # <3>
if resp.status == 200: resp.raise_for_status()
return await resp.read() return resp.content
else:
resp.raise_for_status()
return bytes()
# tag::FLAGS2_ASYNCIO_EXECUTOR[]
async def download_one(session: aiohttp.ClientSession, async def download_one(session: httpx.AsyncClient,
cc: str, cc: str,
base_url: str, base_url: str,
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
verbose: bool) -> Result: verbose: bool) -> DownloadStatus:
try: try:
async with semaphore: async with semaphore:
image = await get_flag(session, base_url, cc) image = await get_flag(session, base_url, cc)
except aiohttp.ClientResponseError as exc: except httpx.HTTPStatusError as exc:
if exc.status == 404: res = exc.response
status = HTTPStatus.not_found if res.status_code == HTTPStatus.NOT_FOUND:
msg = 'not found' status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}'
else: else:
raise FetchError(cc) from exc raise
else: else:
loop = asyncio.get_running_loop() # <1> # tag::FLAGS2_ASYNCIO_EXECUTOR[]
loop.run_in_executor(None, # <2> loop = asyncio.get_running_loop() # <1>
save_flag, image, f'{cc}.gif') # <3> loop.run_in_executor(None, save_flag, # <2>
status = HTTPStatus.ok image, f'{cc}.gif') # <3>
# end::FLAGS2_ASYNCIO_EXECUTOR[]
status = DownloadStatus.OK
msg = 'OK' msg = 'OK'
if verbose and msg: if verbose and msg:
print(cc, msg) print(cc, msg)
return Result(status, cc) return status
# end::FLAGS2_ASYNCIO_EXECUTOR[]
async def supervisor(cc_list: list[str], async def supervisor(cc_list: list[str],
base_url: str, base_url: str,
verbose: bool, verbose: bool,
concur_req: int) -> Counter[HTTPStatus]: concur_req: int) -> Counter[DownloadStatus]: # <1>
counter: Counter[HTTPStatus] = Counter() counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) semaphore = asyncio.Semaphore(concur_req) # <2>
async with aiohttp.ClientSession() as session: async with httpx.AsyncClient() as session:
to_do = [download_one(session, cc, base_url, semaphore, verbose) to_do = [download_one(session, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4>
to_do_iter = asyncio.as_completed(to_do)
if not verbose: if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
for coro in to_do_iter: for coro in to_do_iter: # <6>
try: try:
res = await coro status = await coro # <7>
except FetchError as exc: except httpx.HTTPStatusError as exc: # <13>
country_code = exc.country_code error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
try: error_msg = error_msg.format(resp=exc.response)
error_msg = exc.__cause__.message # type: ignore error = exc
except AttributeError: except httpx.RequestError as exc: # <15>
error_msg = 'Unknown cause' error_msg = f'{exc} {type(exc)}'.strip()
if verbose and error_msg: error = exc
print(f'*** Error for {country_code}: {error_msg}') except KeyboardInterrupt: # <7>
status = HTTPStatus.error break
else: else: # <8>
status = res.status error = None
counter[status] += 1 if error:
status = DownloadStatus.ERROR # <9>
return counter if verbose: # <11>
cc = Path(str(error.request.url)).stem.upper()
print(f'{cc} error: {error_msg}')
counter[status] += 1 # <10>
return counter # <12>
def download_many(cc_list: list[str], def download_many(cc_list: list[str],
base_url: str, base_url: str,
verbose: bool, verbose: bool,
concur_req: int) -> Counter[HTTPStatus]: concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req) coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14> counts = asyncio.run(coro) # <14>
return counts return counts
if __name__ == '__main__': if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ) main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# end::FLAGS2_ASYNCIO_START[]

View File

@@ -5,13 +5,11 @@ import argparse
import string import string
import sys import sys
import time import time
from collections import namedtuple, Counter from collections import Counter
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
Result = namedtuple('Result', 'status data') DownloadStatus = Enum('DownloadStatus', 'OK NOT_FOUND ERROR')
HTTPStatus = Enum('HTTPStatus', 'ok not_found error')
POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split() 'MX PH VN ET EG DE IR TR CD FR').split()
@@ -20,7 +18,7 @@ DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1 MAX_CONCUR_REQ = 1
SERVERS = { SERVERS = {
'REMOTE': 'http://fluentpython.com/data/flags', 'REMOTE': 'https://www.fluentpython.com/data/flags',
'LOCAL': 'http://localhost:8000/flags', 'LOCAL': 'http://localhost:8000/flags',
'DELAY': 'http://localhost:8001/flags', 'DELAY': 'http://localhost:8001/flags',
'ERROR': 'http://localhost:8002/flags', 'ERROR': 'http://localhost:8002/flags',
@@ -52,17 +50,17 @@ def initial_report(cc_list: list[str],
def final_report(cc_list: list[str], def final_report(cc_list: list[str],
counter: Counter[HTTPStatus], counter: Counter[DownloadStatus],
start_time: float) -> None: start_time: float) -> None:
elapsed = time.perf_counter() - start_time elapsed = time.perf_counter() - start_time
print('-' * 20) print('-' * 20)
plural = 's' if counter[HTTPStatus.ok] != 1 else '' plural = 's' if counter[DownloadStatus.OK] != 1 else ''
print(f'{counter[HTTPStatus.ok]} flag{plural} downloaded.') print(f'{counter[DownloadStatus.OK]:3} flag{plural} downloaded.')
if counter[HTTPStatus.not_found]: if counter[DownloadStatus.NOT_FOUND]:
print(f'{counter[HTTPStatus.not_found]} not found.') print(f'{counter[DownloadStatus.NOT_FOUND]:3} not found.')
if counter[HTTPStatus.error]: if counter[DownloadStatus.ERROR]:
plural = 's' if counter[HTTPStatus.error] != 1 else '' plural = 's' if counter[DownloadStatus.ERROR] != 1 else ''
print(f'{counter[HTTPStatus.error]} error{plural}.') print(f'{counter[DownloadStatus.ERROR]:3} error{plural}.')
print(f'Elapsed time: {elapsed:.2f}s') print(f'Elapsed time: {elapsed:.2f}s')
@@ -142,7 +140,7 @@ def process_args(default_concur_req):
sys.exit(2) # command line usage error sys.exit(2) # command line usage error
if not cc_list: if not cc_list:
cc_list = sorted(POP20_CC) cc_list = sorted(POP20_CC)[:args.limit]
return args, cc_list return args, cc_list
@@ -151,9 +149,7 @@ def main(download_many, default_concur_req, max_concur_req):
actual_req = min(args.max_req, max_concur_req, len(cc_list)) actual_req = min(args.max_req, max_concur_req, len(cc_list))
initial_report(cc_list, actual_req, args.server) initial_report(cc_list, actual_req, args.server)
base_url = SERVERS[args.server] base_url = SERVERS[args.server]
DEST_DIR.mkdir(exist_ok=True)
t0 = time.perf_counter() t0 = time.perf_counter()
counter = download_many(cc_list, base_url, args.verbose, actual_req) counter = download_many(cc_list, base_url, args.verbose, actual_req)
assert sum(counter.values()) == len(cc_list), (
'some downloads are unaccounted for'
)
final_report(cc_list, counter, t0) final_report(cc_list, counter, t0)

View File

@@ -17,71 +17,72 @@ Sample run::
""" """
# tag::FLAGS2_BASIC_HTTP_FUNCTIONS[]
from collections import Counter from collections import Counter
from http import HTTPStatus
import requests import httpx
import tqdm # type: ignore import tqdm # type: ignore # <1>
from flags2_common import main, save_flag, HTTPStatus, Result from flags2_common import main, save_flag, DownloadStatus # <2>
DEFAULT_CONCUR_REQ = 1 DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1 MAX_CONCUR_REQ = 1
# tag::FLAGS2_BASIC_HTTP_FUNCTIONS[]
def get_flag(base_url: str, cc: str) -> bytes: def get_flag(base_url: str, cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower() url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = requests.get(url) resp = httpx.get(url, timeout=3.1, follow_redirects=True)
if resp.status_code != 200: # <1> resp.raise_for_status() # <3>
resp.raise_for_status()
return resp.content return resp.content
def download_one(cc: str, base_url: str, verbose: bool = False): def download_one(cc: str, base_url: str, verbose: bool = False) -> DownloadStatus:
try: try:
image = get_flag(base_url, cc) image = get_flag(base_url, cc)
except requests.exceptions.HTTPError as exc: # <2> except httpx.HTTPStatusError as exc: # <4>
res = exc.response res = exc.response
if res.status_code == 404: if res.status_code == HTTPStatus.NOT_FOUND:
status = HTTPStatus.not_found # <3> status = DownloadStatus.NOT_FOUND # <5>
msg = 'not found' msg = f'not found: {res.url}'
else: # <4> else:
raise raise # <6>
else: else:
save_flag(image, f'{cc}.gif') save_flag(image, f'{cc}.gif')
status = HTTPStatus.ok status = DownloadStatus.OK
msg = 'OK' msg = 'OK'
if verbose: # <5> if verbose: # <7>
print(cc, msg) print(cc, msg)
return Result(status, cc) # <6> return status
# end::FLAGS2_BASIC_HTTP_FUNCTIONS[] # end::FLAGS2_BASIC_HTTP_FUNCTIONS[]
# tag::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[] # tag::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[]
def download_many(cc_list: list[str], def download_many(cc_list: list[str],
base_url: str, base_url: str,
verbose: bool, verbose: bool,
_unused_concur_req: int) -> Counter[int]: _unused_concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[int] = Counter() # <1> counter: Counter[DownloadStatus] = Counter() # <1>
cc_iter = sorted(cc_list) # <2> cc_iter = sorted(cc_list) # <2>
if not verbose: if not verbose:
cc_iter = tqdm.tqdm(cc_iter) # <3> cc_iter = tqdm.tqdm(cc_iter) # <3>
for cc in cc_iter: # <4> for cc in cc_iter:
try: try:
res = download_one(cc, base_url, verbose) # <5> status = download_one(cc, base_url, verbose) # <4>
except requests.exceptions.HTTPError as exc: # <6> except httpx.HTTPStatusError as exc: # <5>
error_msg = 'HTTP error {res.status_code} - {res.reason}' error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(res=exc.response) error_msg = error_msg.format(resp=exc.response)
except requests.exceptions.ConnectionError: # <7> except httpx.RequestError as exc: # <6>
error_msg = 'Connection error' error_msg = f'{exc} {type(exc)}'.strip()
except KeyboardInterrupt: # <7>
break
else: # <8> else: # <8>
error_msg = '' error_msg = ''
status = res.status
if error_msg: if error_msg:
status = HTTPStatus.error # <9> status = DownloadStatus.ERROR # <9>
counter[status] += 1 # <10> counter[status] += 1 # <10>
if verbose and error_msg: # <11> if verbose and error_msg: # <11>
print(f'*** Error for {cc}: {error_msg}') print(f'{cc} error: {error_msg}')
return counter # <12> return counter # <12>
# end::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[] # end::FLAGS2_DOWNLOAD_MANY_SEQUENTIAL[]

View File

@@ -22,48 +22,49 @@ Sample run::
from collections import Counter from collections import Counter
from concurrent import futures from concurrent import futures
import requests import httpx
import tqdm # type: ignore # <1> import tqdm # type: ignore
from flags2_common import main, HTTPStatus # <2> from flags2_common import main, DownloadStatus
from flags2_sequential import download_one # <3> from flags2_sequential import download_one # <1>
DEFAULT_CONCUR_REQ = 30 # <4> DEFAULT_CONCUR_REQ = 30 # <2>
MAX_CONCUR_REQ = 1000 # <5> MAX_CONCUR_REQ = 1000 # <3>
def download_many(cc_list: list[str], def download_many(cc_list: list[str],
base_url: str, base_url: str,
verbose: bool, verbose: bool,
concur_req: int) -> Counter[int]: concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[int] = Counter() counter: Counter[DownloadStatus] = Counter()
with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: # <6> with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: # <4>
to_do_map = {} # <7> to_do_map = {} # <5>
for cc in sorted(cc_list): # <8> for cc in sorted(cc_list): # <6>
future = executor.submit(download_one, cc, future = executor.submit(download_one, cc,
base_url, verbose) # <9> base_url, verbose) # <7>
to_do_map[future] = cc # <10> to_do_map[future] = cc # <8>
done_iter = futures.as_completed(to_do_map) # <11> done_iter = futures.as_completed(to_do_map) # <9>
if not verbose: if not verbose:
done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <12> done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <10>
for future in done_iter: # <13> for future in done_iter: # <11>
try: try:
res = future.result() # <14> status = future.result() # <12>
except requests.exceptions.HTTPError as exc: # <15> except httpx.HTTPStatusError as exc: # <13>
error_fmt = 'HTTP {res.status_code} - {res.reason}' error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_fmt.format(res=exc.response) error_msg = error_msg.format(resp=exc.response)
except requests.exceptions.ConnectionError: except httpx.RequestError as exc: # <15>
error_msg = 'Connection error' error_msg = f'{exc} {type(exc)}'.strip()
except KeyboardInterrupt:
break
else: else:
error_msg = '' error_msg = ''
status = res.status
if error_msg: if error_msg:
status = HTTPStatus.error status = DownloadStatus.ERROR
counter[status] += 1 counter[status] += 1
if verbose and error_msg: if verbose and error_msg:
cc = to_do_map[future] # <16> cc = to_do_map[future] # <16>
print(f'*** Error for {cc}: {error_msg}') print(f'{cc} error: {error_msg}')
return counter return counter

View File

@@ -8,11 +8,12 @@ asyncio async/await version using run_in_executor for save_flag.
import asyncio import asyncio
from collections import Counter from collections import Counter
from http import HTTPStatus
import aiohttp import aiohttp
import tqdm # type: ignore import tqdm # type: ignore
from flags2_common import main, HTTPStatus, Result, save_flag from flags2_common import main, DownloadStatus, save_flag
# default set low to avoid errors from remote site, such as # default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable # 503 - Service Temporarily Unavailable
@@ -54,15 +55,15 @@ async def download_one(session: aiohttp.ClientSession,
cc: str, cc: str,
base_url: str, base_url: str,
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
verbose: bool) -> Result: verbose: bool) -> DownloadStatus:
try: try:
async with semaphore: async with semaphore:
image = await get_flag(session, base_url, cc) # <1> image = await get_flag(session, base_url, cc) # <1>
async with semaphore: async with semaphore:
country = await get_country(session, base_url, cc) # <2> country = await get_country(session, base_url, cc) # <2>
except aiohttp.ClientResponseError as exc: except aiohttp.ClientResponseError as exc:
if exc.status == 404: if exc.status == HTTPStatus.NOT_FOUND:
status = HTTPStatus.not_found status = DownloadStatus.NOT_FOUND
msg = 'not found' msg = 'not found'
else: else:
raise FetchError(cc) from exc raise FetchError(cc) from exc
@@ -72,18 +73,18 @@ async def download_one(session: aiohttp.ClientSession,
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
loop.run_in_executor(None, loop.run_in_executor(None,
save_flag, image, filename) save_flag, image, filename)
status = HTTPStatus.ok status = DownloadStatus.OK
msg = 'OK' msg = 'OK'
if verbose and msg: if verbose and msg:
print(cc, msg) print(cc, msg)
return Result(status, cc) return status
# end::FLAGS3_ASYNCIO_DOWNLOAD_ONE[] # end::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
async def supervisor(cc_list: list[str], async def supervisor(cc_list: list[str],
base_url: str, base_url: str,
verbose: bool, verbose: bool,
concur_req: int) -> Counter[HTTPStatus]: concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[HTTPStatus] = Counter() counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req) semaphore = asyncio.Semaphore(concur_req)
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
to_do = [download_one(session, cc, base_url, to_do = [download_one(session, cc, base_url,
@@ -95,7 +96,7 @@ async def supervisor(cc_list: list[str],
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
for coro in to_do_iter: for coro in to_do_iter:
try: try:
res = await coro status = await coro
except FetchError as exc: except FetchError as exc:
country_code = exc.country_code country_code = exc.country_code
try: try:
@@ -104,9 +105,7 @@ async def supervisor(cc_list: list[str],
error_msg = 'Unknown cause' error_msg = 'Unknown cause'
if verbose and error_msg: if verbose and error_msg:
print(f'*** Error for {country_code}: {error_msg}') print(f'*** Error for {country_code}: {error_msg}')
status = HTTPStatus.error status = DownloadStatus.ERROR
else:
status = res.status
counter[status] += 1 counter[status] += 1
@@ -116,7 +115,7 @@ async def supervisor(cc_list: list[str],
def download_many(cc_list: list[str], def download_many(cc_list: list[str],
base_url: str, base_url: str,
verbose: bool, verbose: bool,
concur_req: int) -> Counter[HTTPStatus]: concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req) coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14> counts = asyncio.run(coro) # <14>

View File

@@ -9,38 +9,38 @@ Sample run::
$ python3 flags_asyncio.py $ python3 flags_asyncio.py
EG VN IN TR RU ID US DE CN MX JP BD NG ET FR BR PH PK CD IR EG VN IN TR RU ID US DE CN MX JP BD NG ET FR BR PH PK CD IR
20 flags downloaded in 1.07s 20 flags downloaded in 1.07s
""" """
# tag::FLAGS_ASYNCIO_TOP[] # tag::FLAGS_ASYNCIO_TOP[]
import asyncio import asyncio
from aiohttp import ClientSession # <1> from httpx import AsyncClient # <1>
from flags import BASE_URL, save_flag, main # <2> from flags import BASE_URL, save_flag, main # <2>
async def download_one(session: ClientSession, cc: str): # <3> async def download_one(session: AsyncClient, cc: str): # <3>
image = await get_flag(session, cc) image = await get_flag(session, cc)
save_flag(image, f'{cc}.gif') save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True) print(cc, end=' ', flush=True)
return cc return cc
async def get_flag(session: ClientSession, cc: str) -> bytes: # <4> async def get_flag(session: AsyncClient, cc: str) -> bytes: # <4>
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower() url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
async with session.get(url) as resp: # <5> resp = await session.get(url, timeout=6.1,
return await resp.read() # <6> follow_redirects=True) # <5>
return resp.read() # <6>
# end::FLAGS_ASYNCIO_TOP[] # end::FLAGS_ASYNCIO_TOP[]
# tag::FLAGS_ASYNCIO_START[] # tag::FLAGS_ASYNCIO_START[]
def download_many(cc_list: list[str]) -> int: # <1> def download_many(cc_list: list[str]) -> int: # <1>
return asyncio.run(supervisor(cc_list)) # <2> return asyncio.run(supervisor(cc_list)) # <2>
async def supervisor(cc_list: list[str]) -> int: async def supervisor(cc_list: list[str]) -> int:
async with ClientSession() as session: # <3> async with AsyncClient() as session: # <3>
to_do = [download_one(session, cc) # <4> to_do = [download_one(session, cc)
for cc in sorted(cc_list)] for cc in sorted(cc_list)] # <4>
res = await asyncio.gather(*to_do) # <5> res = await asyncio.gather(*to_do) # <5>
return len(res) # <6> return len(res) # <6>
if __name__ == '__main__': if __name__ == '__main__':
main(download_many) main(download_many)

View File

@@ -1,13 +1,10 @@
aiohttp==3.7.4.post0 anyio==3.3.2
async-timeout==3.0.1
attrs==21.2.0
certifi==2021.5.30 certifi==2021.5.30
chardet==4.0.0 charset-normalizer==2.0.6
charset-normalizer==2.0.4 h11==0.12.0
httpcore==0.13.7
httpx==1.0.0b0
idna==3.2 idna==3.2
multidict==5.1.0 rfc3986==1.5.0
requests==2.26.0 sniffio==1.2.0
tqdm==4.62.2 tqdm==4.62.3
typing-extensions==3.10.0.2
urllib3==1.26.6
yarl==1.6.3

View File

@@ -15,8 +15,10 @@ import time
from functools import partial from functools import partial
from http import server, HTTPStatus from http import server, HTTPStatus
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
from random import random from random import random, uniform
MIN_DELAY = 0.5 # minimum delay for do_GET (seconds)
MAX_DELAY = 5.0 # maximum delay for do_GET (seconds)
class SlowHTTPRequestHandler(SimpleHTTPRequestHandler): class SlowHTTPRequestHandler(SimpleHTTPRequestHandler):
"""SlowHTTPRequestHandler adds delays and errors to test HTTP clients. """SlowHTTPRequestHandler adds delays and errors to test HTTP clients.
@@ -36,15 +38,23 @@ class SlowHTTPRequestHandler(SimpleHTTPRequestHandler):
def do_GET(self): def do_GET(self):
"""Serve a GET request.""" """Serve a GET request."""
time.sleep(.5) delay = uniform(MIN_DELAY, MAX_DELAY)
cc = self.path[-6:-4].upper()
print(f'{cc} delay: {delay:0.2}s')
time.sleep(delay)
if random() < self.error_rate: if random() < self.error_rate:
# HTTPStatus.IM_A_TEAPOT requires Python >= 3.9 # HTTPStatus.IM_A_TEAPOT requires Python >= 3.9
self.send_error(HTTPStatus.IM_A_TEAPOT, "I'm a Teapot") try:
self.send_error(HTTPStatus.IM_A_TEAPOT, "I'm a Teapot")
except BrokenPipeError as exc:
print(f'{cc} *** BrokenPipeError: client closed')
else: else:
f = self.send_head() f = self.send_head()
if f: if f:
try: try:
self.copyfile(f, self.wfile) self.copyfile(f, self.wfile)
except BrokenPipeError as exc:
print(f'{cc} *** BrokenPipeError: client closed')
finally: finally:
f.close() f.close()
@@ -67,9 +77,9 @@ if __name__ == '__main__':
help='Error rate; e.g. use .25 for 25%% probability ' help='Error rate; e.g. use .25 for 25%% probability '
'[default:0.0]') '[default:0.0]')
parser.add_argument('port', action='store', parser.add_argument('port', action='store',
default=8000, type=int, default=8001, type=int,
nargs='?', nargs='?',
help='Specify alternate port [default: 8000]') help='Specify alternate port [default: 8001]')
args = parser.parse_args() args = parser.parse_args()
handler_class = partial(SlowHTTPRequestHandler, handler_class = partial(SlowHTTPRequestHandler,
directory=args.directory, directory=args.directory,

View File

@@ -0,0 +1,38 @@
import httpx
def tree(cls, level=0):
yield cls.__name__, level
for sub_cls in cls.__subclasses__():
yield from tree(sub_cls, level+1)
def display(cls):
for cls_name, level in tree(cls):
indent = ' ' * 4 * level
print(f'{indent}{cls_name}')
def find_roots(module):
exceptions = []
for name in dir(module):
obj = getattr(module, name)
if isinstance(obj, type) and issubclass(obj, BaseException):
exceptions.append(obj)
roots = []
for exc in exceptions:
root = True
for other in exceptions:
if exc is not other and issubclass(exc, other):
root = False
break
if root:
roots.append(exc)
return roots
def main():
for exc in find_roots(httpx):
display(exc)
if __name__ == '__main__':
main()