final concurrency examples

This commit is contained in:
Luciano Ramalho
2015-03-13 18:24:31 -03:00
parent 39e87de5cd
commit 2d7a96742b
26 changed files with 1231 additions and 481 deletions

View File

@@ -1,122 +1,132 @@
"""Download flags of top 10 countries by population
"""Download flags of countries (with error handling).
asyncio version
asyncio version using thread pool to save files
Sample run::
$ python3 pop10_asyncio1.py
CN retrieved.
US retrieved.
BR retrieved.
NG retrieved.
PK retrieved.
RU retrieved.
ID retrieved.
IN retrieved.
BD retrieved.
JP retrieved.
10 flags downloaded in 0.45s
$
"""
import asyncio
from collections import namedtuple
from enum import Enum
import collections
import aiohttp
from aiohttp import web
import tqdm
from flags_sequential2 import BASE_URL
from flags_sequential2 import save_flag, main, Counts
from flags2_common import main, HTTPStatus, Result, save_flag
MAX_TASKS = 100 if 'localhost' in BASE_URL else 5
TIMEOUT = 120 # seconds
Status = Enum('Status', 'ok not_found error')
Result = namedtuple('Result', 'status data')
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
class FetchError(Exception):
def __init__(self, country_code):
self.country_code = country_code
# BEGIN FLAGS3_ASYNCIO
@asyncio.coroutine
def http_get(url):
res = yield from aiohttp.request('GET', url)
if res.status == 200:
ctype = res.headers.get('Content-type', '').lower()
if 'json' in ctype or url.endswith('json'):
data = yield from res.json()
data = yield from res.json() # <1>
else:
data = yield from res.read()
data = yield from res.read() # <2>
return data
elif res.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.errors.HttpProcessingError(
code=res.status, message=res.reason, headers=res.headers)
code=res.status, message=res.reason,
headers=res.headers)
@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
return (yield from http_get(url))
@asyncio.coroutine
def get_country(cc):
url = '{}/{cc}/metadata.json'.format(BASE_URL, cc=cc.lower())
metadata = yield from http_get(url)
def get_country(base_url, cc):
url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
metadata = yield from http_get(url) # <3>
return metadata['country']
@asyncio.coroutine
def download_one(cc, semaphore):
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
return (yield from http_get(url)) # <4>
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
try:
with (yield from semaphore): # <5>
image = yield from get_flag(base_url, cc)
with (yield from semaphore):
image = yield from get_flag(cc)
with (yield from semaphore):
country = yield from get_country(cc)
country = yield from get_country(base_url, cc)
except web.HTTPNotFound:
status = Status.not_found
except aiohttp.errors.HttpProcessingError as exc:
msg = '{} failed: {exc.code} - {exc.message}'
print(msg.format(cc, exc=exc))
status = Status.error
except aiohttp.errors.ClientResponseError as exc:
try:
context = exc.__context__.__class__.__name__
except AttributeError:
context = '(unknown context)'
msg = '{} failed: {}'
print(msg.format(cc, context))
status = Status.error
else:
print('{} retrieved.'.format(cc.upper()))
country = country.replace(' ', '_')
save_flag(image, '{}-{}.gif'.format(country, cc))
status = Status.ok
return Result(status, cc)
def download_many(cc_list):
semaphore = asyncio.Semaphore(MAX_TASKS)
to_do = [download_one(cc, semaphore) for cc in cc_list]
loop = asyncio.get_event_loop()
#loop.set_debug(True)
try:
done, pending = loop.run_until_complete(asyncio.wait(to_do, timeout=TIMEOUT))
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
print('*' * 60)
print(exc)
print(vars(exc))
print('*' * 60)
counts = []
for status in Status:
counts.append(len([task for task in done
if task.result().status == status]))
for task in pending:
task.cancel()
raise FetchError(cc) from exc
else:
country = country.replace(' ', '_')
filename = '{}-{}.gif'.format(country, cc)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, save_flag, image, filename)
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(status, cc)
# END FLAGS3_ASYNCIO
@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):
counter = collections.Counter()
semaphore = asyncio.Semaphore(concur_req)
to_do = [download_one(cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
for future in to_do_iter:
try:
res = yield from future
except FetchError as exc:
country_code = exc.country_code
try:
error_msg = exc.__cause__.args[0]
except IndexError:
error_msg = exc.__cause__.__class__.__name__
else:
error_msg = ''
status = res.status
if error_msg:
status = HTTPStatus.error
counter[status] += 1
if verbose and error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
return counter
def download_many(cc_list, base_url, verbose, concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
counts = loop.run_until_complete(coro)
loop.close()
return Counts(*counts)
return counts
if __name__ == '__main__':
main(download_many)
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)