ch22: examples

This commit is contained in:
Luciano Ramalho 2021-03-02 17:22:02 -03:00
parent 1702717182
commit cdbca8030e

View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""Download flags of countries (with error handling).
asyncio async/await version using run_in_executor for save_flag.
"""
import asyncio
from collections import Counter
import aiohttp
import tqdm # type: ignore
from flags2_common import main, HTTPStatus, Result, save_flag
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
class FetchError(Exception):
def __init__(self, country_code: str):
self.country_code = country_code
async def get_flag(session: aiohttp.ClientSession,
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'
async with session.get(url) as resp:
if resp.status == 200:
return await resp.read()
else:
resp.raise_for_status()
return bytes()
# tag::FLAGS3_ASYNCIO_GET_COUNTRY[]
async def get_country(session: aiohttp.ClientSession, # <1>
base_url: str,
cc: str) -> str:
url = f'{base_url}/{cc}/metadata.json'
async with session.get(url) as resp:
if resp.status == 200:
metadata = await resp.json() # <2>
return metadata.get('country', 'no name') # <3>
else:
resp.raise_for_status()
return ''
# end::FLAGS3_ASYNCIO_GET_COUNTRY[]
# tag::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
async def download_one(session: aiohttp.ClientSession,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> Result:
try:
async with semaphore:
image = await get_flag(session, base_url, cc) # <1>
async with semaphore:
country = await get_country(session, base_url, cc) # <2>
except aiohttp.ClientResponseError as exc:
if exc.status == 404:
status = HTTPStatus.not_found
msg = 'not found'
else:
raise FetchError(cc) from exc
else:
filename = country.replace(' ', '_') # <3>
filename = f'{filename}.gif'
loop = asyncio.get_running_loop()
loop.run_in_executor(None,
save_flag, image, filename)
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(status, cc)
# end::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
async def supervisor(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[HTTPStatus]:
counter: Counter[HTTPStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req)
async with aiohttp.ClientSession() as session:
to_do = [download_one(session, cc, base_url,
semaphore, verbose)
for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
for coro in to_do_iter:
try:
res = await coro
except FetchError as exc:
country_code = exc.country_code
try:
error_msg = exc.__cause__.message # type: ignore
except AttributeError:
error_msg = 'Unknown cause'
if verbose and error_msg:
print(f'*** Error for {country_code}: {error_msg}')
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1
return counter
def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[HTTPStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro) # <14>
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)