updated from Atlas
This commit is contained in:
8
17-futures/countries/country_codes.txt
Normal file
8
17-futures/countries/country_codes.txt
Normal file
@@ -0,0 +1,8 @@
|
||||
AD AE AF AG AL AM AO AR AT AU AZ BA BB BD BE BF BG BH BI BJ BN BO BR BS BT
|
||||
BW BY BZ CA CD CF CG CH CI CL CM CN CO CR CU CV CY CZ DE DJ DK DM DZ EC EE
|
||||
EG ER ES ET FI FJ FM FR GA GB GD GE GH GM GN GQ GR GT GW GY HN HR HT HU ID
|
||||
IE IL IN IQ IR IS IT JM JO JP KE KG KH KI KM KN KP KR KW KZ LA LB LC LI LK
|
||||
LR LS LT LU LV LY MA MC MD ME MG MH MK ML MM MN MR MT MU MV MW MX MY MZ NA
|
||||
NE NG NI NL NO NP NR NZ OM PA PE PG PH PK PL PT PW PY QA RO RS RU RW SA SB
|
||||
SC SD SE SG SI SK SL SM SN SO SR SS ST SV SY SZ TD TG TH TJ TL TM TN TO TR
|
||||
TT TV TW TZ UA UG US UY UZ VA VC VE VN VU WS YE ZA ZM ZW
|
||||
63
17-futures/countries/flags.py
Normal file
63
17-futures/countries/flags.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""Download flags of top 20 countries by population
|
||||
|
||||
Sequential version
|
||||
|
||||
Sample run::
|
||||
|
||||
$ python3 flags.py
|
||||
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
|
||||
20 flags downloaded in 10.16s
|
||||
|
||||
"""
|
||||
# BEGIN FLAGS_PY
|
||||
import os
|
||||
import time
|
||||
import sys
|
||||
|
||||
import requests # <1>
|
||||
|
||||
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
|
||||
'MX PH VN ET EG DE IR TR CD FR').split() # <2>
|
||||
|
||||
BASE_URL = 'http://flupy.org/data/flags' # <3>
|
||||
|
||||
DEST_DIR = 'downloads/' # <4>
|
||||
|
||||
|
||||
def save_flag(img, filename): # <5>
|
||||
path = os.path.join(DEST_DIR, filename)
|
||||
with open(path, 'wb') as fp:
|
||||
fp.write(img)
|
||||
|
||||
|
||||
def get_flag(cc): # <6>
|
||||
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
|
||||
resp = requests.get(url)
|
||||
return resp.content
|
||||
|
||||
|
||||
def show(text): # <7>
|
||||
print(text, end=' ')
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def download_many(cc_list): # <8>
|
||||
for cc in sorted(cc_list): # <9>
|
||||
image = get_flag(cc)
|
||||
show(cc)
|
||||
save_flag(image, cc.lower() + '.gif')
|
||||
|
||||
return len(cc_list)
|
||||
|
||||
|
||||
def main(download_many): # <10>
|
||||
t0 = time.time()
|
||||
count = download_many(POP20_CC)
|
||||
elapsed = time.time() - t0
|
||||
msg = '\n{} flags downloaded in {:.2f}s'
|
||||
print(msg.format(count, elapsed))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many) # <11>
|
||||
# END FLAGS_PY
|
||||
BIN
17-futures/countries/flags.zip
Normal file
BIN
17-futures/countries/flags.zip
Normal file
Binary file not shown.
118
17-futures/countries/flags2_asyncio.py
Normal file
118
17-futures/countries/flags2_asyncio.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""Download flags of countries (with error handling).
|
||||
|
||||
asyncio version
|
||||
|
||||
Sample run::
|
||||
|
||||
$ python3 flags2_asyncio.py -s ERROR -e -m 200
|
||||
ERROR site: http://localhost:8003/flags
|
||||
Searching for 676 flags: from AA to ZZ
|
||||
200 concurrent connections will be used.
|
||||
--------------------
|
||||
146 flags downloaded.
|
||||
363 not found.
|
||||
167 errors.
|
||||
Elapsed time: 2.59s
|
||||
|
||||
"""
|
||||
# BEGIN FLAGS2_ASYNCIO_TOP
|
||||
import asyncio
|
||||
import collections
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
import tqdm
|
||||
|
||||
from flags2_common import main, HTTPStatus, Result, save_flag
|
||||
|
||||
# default set low to avoid errors from remote site, such as
|
||||
# 503 - Service Temporarily Unavailable
|
||||
DEFAULT_CONCUR_REQ = 5
|
||||
MAX_CONCUR_REQ = 1000
|
||||
|
||||
|
||||
class FetchError(Exception): # <1>
|
||||
def __init__(self, country_code):
|
||||
self.country_code = country_code
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def get_flag(base_url, cc): # <2>
|
||||
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
|
||||
resp = yield from aiohttp.request('GET', url)
|
||||
if resp.status == 200:
|
||||
image = yield from resp.read()
|
||||
return image
|
||||
elif resp.status == 404:
|
||||
raise web.HTTPNotFound()
|
||||
else:
|
||||
raise aiohttp.HttpProcessingError(
|
||||
code=resp.status, message=resp.reason,
|
||||
headers=resp.headers)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def download_one(cc, base_url, semaphore, verbose): # <3>
|
||||
try:
|
||||
with (yield from semaphore): # <4>
|
||||
image = yield from get_flag(base_url, cc) # <5>
|
||||
except web.HTTPNotFound: # <6>
|
||||
status = HTTPStatus.not_found
|
||||
msg = 'not found'
|
||||
except Exception as exc:
|
||||
raise FetchError(cc) from exc # <7>
|
||||
else:
|
||||
save_flag(image, cc.lower() + '.gif') # <8>
|
||||
status = HTTPStatus.ok
|
||||
msg = 'OK'
|
||||
|
||||
if verbose and msg:
|
||||
print(cc, msg)
|
||||
|
||||
return Result(status, cc)
|
||||
# END FLAGS2_ASYNCIO_TOP
|
||||
|
||||
# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY
|
||||
@asyncio.coroutine
|
||||
def downloader_coro(cc_list, base_url, verbose, concur_req): # <1>
|
||||
counter = collections.Counter()
|
||||
semaphore = asyncio.Semaphore(concur_req) # <2>
|
||||
to_do = [download_one(cc, base_url, semaphore, verbose)
|
||||
for cc in sorted(cc_list)] # <3>
|
||||
|
||||
to_do_iter = asyncio.as_completed(to_do) # <4>
|
||||
if not verbose:
|
||||
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
|
||||
for future in to_do_iter: # <6>
|
||||
try:
|
||||
res = yield from future # <7>
|
||||
except FetchError as exc: # <8>
|
||||
country_code = exc.country_code # <9>
|
||||
try:
|
||||
error_msg = exc.__cause__.args[0] # <10>
|
||||
except IndexError:
|
||||
error_msg = exc.__cause__.__class__.__name__ # <11>
|
||||
if verbose and error_msg:
|
||||
msg = '*** Error for {}: {}'
|
||||
print(msg.format(country_code, error_msg))
|
||||
status = HTTPStatus.error
|
||||
else:
|
||||
status = res.status
|
||||
|
||||
counter[status] += 1 # <12>
|
||||
|
||||
return counter # <13>
|
||||
|
||||
|
||||
def download_many(cc_list, base_url, verbose, concur_req):
|
||||
loop = asyncio.get_event_loop()
|
||||
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
|
||||
counts = loop.run_until_complete(coro) # <14>
|
||||
loop.close() # <15>
|
||||
|
||||
return counts
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
|
||||
# END FLAGS2_ASYNCIO_DOWNLOAD_MANY
|
||||
112
17-futures/countries/flags2_asyncio_executor.py
Normal file
112
17-futures/countries/flags2_asyncio_executor.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""Download flags of countries (with error handling).
|
||||
|
||||
asyncio version using thread pool to save files
|
||||
|
||||
Sample run::
|
||||
|
||||
$
|
||||
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
import tqdm
|
||||
|
||||
from flags2_common import main, HTTPStatus, Result, save_flag
|
||||
|
||||
# default set low to avoid errors from remote site, such as
|
||||
# 503 - Service Temporarily Unavailable
|
||||
DEFAULT_CONCUR_REQ = 5
|
||||
MAX_CONCUR_REQ = 1000
|
||||
|
||||
|
||||
class FetchError(Exception):
|
||||
def __init__(self, country_code):
|
||||
self.country_code = country_code
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def get_flag(base_url, cc):
|
||||
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
|
||||
resp = yield from aiohttp.request('GET', url)
|
||||
if resp.status == 200:
|
||||
image = yield from resp.read()
|
||||
return image
|
||||
elif resp.status == 404:
|
||||
raise web.HTTPNotFound()
|
||||
else:
|
||||
raise aiohttp.HttpProcessingError(
|
||||
code=resp.status, message=resp.reason,
|
||||
headers=resp.headers)
|
||||
|
||||
# BEGIN FLAGS2_ASYNCIO_EXECUTOR
|
||||
@asyncio.coroutine
|
||||
def download_one(cc, base_url, semaphore, verbose):
|
||||
try:
|
||||
with (yield from semaphore):
|
||||
image = yield from get_flag(base_url, cc)
|
||||
except web.HTTPNotFound:
|
||||
status = HTTPStatus.not_found
|
||||
msg = 'not found'
|
||||
except Exception as exc:
|
||||
raise FetchError(cc) from exc
|
||||
else:
|
||||
loop = asyncio.get_event_loop() # <1>
|
||||
loop.run_in_executor(None, # <2>
|
||||
save_flag, image, cc.lower() + '.gif') # <3>
|
||||
status = HTTPStatus.ok
|
||||
msg = 'OK'
|
||||
|
||||
if verbose and msg:
|
||||
print(cc, msg)
|
||||
|
||||
return Result(status, cc)
|
||||
# END FLAGS2_ASYNCIO_EXECUTOR
|
||||
|
||||
@asyncio.coroutine
|
||||
def downloader_coro(cc_list, base_url, verbose, concur_req):
|
||||
counter = collections.Counter()
|
||||
semaphore = asyncio.Semaphore(concur_req)
|
||||
to_do = [download_one(cc, base_url, semaphore, verbose)
|
||||
for cc in sorted(cc_list)]
|
||||
|
||||
to_do_iter = asyncio.as_completed(to_do)
|
||||
if not verbose:
|
||||
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
|
||||
for future in to_do_iter:
|
||||
try:
|
||||
res = yield from future
|
||||
except FetchError as exc:
|
||||
country_code = exc.country_code
|
||||
try:
|
||||
error_msg = exc.__cause__.args[0]
|
||||
except IndexError:
|
||||
error_msg = exc.__cause__.__class__.__name__
|
||||
else:
|
||||
error_msg = ''
|
||||
status = res.status
|
||||
|
||||
if error_msg:
|
||||
status = HTTPStatus.error
|
||||
counter[status] += 1
|
||||
if verbose and error_msg:
|
||||
msg = '*** Error for {}: {}'
|
||||
print(msg.format(country_code, error_msg))
|
||||
|
||||
return counter
|
||||
|
||||
|
||||
def download_many(cc_list, base_url, verbose, concur_req):
|
||||
loop = asyncio.get_event_loop()
|
||||
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
|
||||
counts = loop.run_until_complete(coro)
|
||||
loop.close()
|
||||
|
||||
return counts
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
|
||||
149
17-futures/countries/flags2_common.py
Normal file
149
17-futures/countries/flags2_common.py
Normal file
@@ -0,0 +1,149 @@
|
||||
"""Utilities for second set of flag examples.
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import sys
|
||||
import string
|
||||
import argparse
|
||||
from collections import namedtuple
|
||||
from enum import Enum
|
||||
|
||||
|
||||
Result = namedtuple('Result', 'status data')
|
||||
|
||||
HTTPStatus = Enum('Status', 'ok not_found error')
|
||||
|
||||
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
|
||||
'MX PH VN ET EG DE IR TR CD FR').split()
|
||||
|
||||
DEFAULT_CONCUR_REQ = 1
|
||||
MAX_CONCUR_REQ = 1
|
||||
|
||||
SERVERS = {
|
||||
'REMOTE': 'http://flupy.org/data/flags',
|
||||
'LOCAL': 'http://localhost:8001/flags',
|
||||
'DELAY': 'http://localhost:8002/flags',
|
||||
'ERROR': 'http://localhost:8003/flags',
|
||||
}
|
||||
DEFAULT_SERVER = 'LOCAL'
|
||||
|
||||
DEST_DIR = 'downloads/'
|
||||
COUNTRY_CODES_FILE = 'country_codes.txt'
|
||||
|
||||
|
||||
def save_flag(img, filename):
|
||||
path = os.path.join(DEST_DIR, filename)
|
||||
with open(path, 'wb') as fp:
|
||||
fp.write(img)
|
||||
|
||||
|
||||
def initial_report(cc_list, actual_req, server_label):
|
||||
if len(cc_list) <= 10:
|
||||
cc_msg = ', '.join(cc_list)
|
||||
else:
|
||||
cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
|
||||
print('{} site: {}'.format(server_label, SERVERS[server_label]))
|
||||
msg = 'Searching for {} flag{}: {}'
|
||||
plural = 's' if len(cc_list) != 1 else ''
|
||||
print(msg.format(len(cc_list), plural, cc_msg))
|
||||
plural = 's' if actual_req != 1 else ''
|
||||
msg = '{} concurrent connection{} will be used.'
|
||||
print(msg.format(actual_req, plural))
|
||||
|
||||
|
||||
def final_report(cc_list, counter, start_time):
|
||||
elapsed = time.time() - start_time
|
||||
print('-' * 20)
|
||||
msg = '{} flag{} downloaded.'
|
||||
plural = 's' if counter[HTTPStatus.ok] != 1 else ''
|
||||
print(msg.format(counter[HTTPStatus.ok], plural))
|
||||
if counter[HTTPStatus.not_found]:
|
||||
print(counter[HTTPStatus.not_found], 'not found.')
|
||||
if counter[HTTPStatus.error]:
|
||||
plural = 's' if counter[HTTPStatus.error] != 1 else ''
|
||||
print('{} error{}.'.format(counter[HTTPStatus.error], plural))
|
||||
print('Elapsed time: {:.2f}s'.format(elapsed))
|
||||
|
||||
|
||||
def expand_cc_args(every_cc, all_cc, cc_args, limit):
|
||||
codes = set()
|
||||
A_Z = string.ascii_uppercase
|
||||
if every_cc:
|
||||
codes.update(a+b for a in A_Z for b in A_Z)
|
||||
elif all_cc:
|
||||
with open(COUNTRY_CODES_FILE) as fp:
|
||||
text = fp.read()
|
||||
codes.update(text.split())
|
||||
else:
|
||||
for cc in (c.upper() for c in cc_args):
|
||||
if len(cc) == 1 and cc in A_Z:
|
||||
codes.update(cc+c for c in A_Z)
|
||||
elif len(cc) == 2 and all(c in A_Z for c in cc):
|
||||
codes.add(cc)
|
||||
else:
|
||||
msg = 'each CC argument must be A to Z or AA to ZZ.'
|
||||
raise ValueError('*** Usage error: '+msg)
|
||||
return sorted(codes)[:limit]
|
||||
|
||||
|
||||
def process_args(default_concur_req):
|
||||
server_options = ', '.join(sorted(SERVERS))
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Download flags for country codes. '
|
||||
'Default: top 20 countries by population.')
|
||||
parser.add_argument('cc', metavar='CC', nargs='*',
|
||||
help='country code or 1st letter (eg. B for BA...BZ)')
|
||||
parser.add_argument('-a', '--all', action='store_true',
|
||||
help='get all available flags (AD to ZW)')
|
||||
parser.add_argument('-e', '--every', action='store_true',
|
||||
help='get flags for every possible code (AA...ZZ)')
|
||||
parser.add_argument('-l', '--limit', metavar='N', type=int,
|
||||
help='limit to N first codes', default=sys.maxsize)
|
||||
parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
|
||||
default=default_concur_req,
|
||||
help='maximum concurrent requests (default={})'
|
||||
.format(default_concur_req))
|
||||
parser.add_argument('-s', '--server', metavar='LABEL',
|
||||
default=DEFAULT_SERVER,
|
||||
help='Server to hit; one of {} (default={})'
|
||||
.format(server_options, DEFAULT_SERVER))
|
||||
parser.add_argument('-v', '--verbose', action='store_true',
|
||||
help='output detailed progress info')
|
||||
args = parser.parse_args()
|
||||
if args.max_req < 1:
|
||||
print('*** Usage error: --max_req CONCURRENT must be >= 1')
|
||||
parser.print_usage()
|
||||
sys.exit(1)
|
||||
if args.limit < 1:
|
||||
print('*** Usage error: --limit N must be >= 1')
|
||||
parser.print_usage()
|
||||
sys.exit(1)
|
||||
args.server = args.server.upper()
|
||||
if args.server not in SERVERS:
|
||||
print('*** Usage error: --server LABEL must be one of',
|
||||
server_options)
|
||||
parser.print_usage()
|
||||
sys.exit(1)
|
||||
try:
|
||||
cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
|
||||
except ValueError as exc:
|
||||
print(exc.args[0])
|
||||
parser.print_usage()
|
||||
sys.exit(1)
|
||||
|
||||
if not cc_list:
|
||||
cc_list = sorted(POP20_CC)
|
||||
return args, cc_list
|
||||
|
||||
|
||||
def main(download_many, default_concur_req, max_concur_req):
|
||||
args, cc_list = process_args(default_concur_req)
|
||||
actual_req = min(args.max_req, max_concur_req, len(cc_list))
|
||||
initial_report(cc_list, actual_req, args.server)
|
||||
base_url = SERVERS[args.server]
|
||||
t0 = time.time()
|
||||
counter = download_many(cc_list, base_url, args.verbose, actual_req)
|
||||
assert sum(counter.values()) == len(cc_list), \
|
||||
'some downloads are unaccounted for'
|
||||
final_report(cc_list, counter, t0)
|
||||
87
17-futures/countries/flags2_sequential.py
Normal file
87
17-futures/countries/flags2_sequential.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""Download flags of countries (with error handling).
|
||||
|
||||
Sequential version
|
||||
|
||||
Sample run::
|
||||
|
||||
$ python3 flags2_sequential.py -s DELAY b
|
||||
DELAY site: http://localhost:8002/flags
|
||||
Searching for 26 flags: from BA to BZ
|
||||
1 concurrent connection will be used.
|
||||
--------------------
|
||||
17 flags downloaded.
|
||||
9 not found.
|
||||
Elapsed time: 13.36s
|
||||
|
||||
"""
|
||||
|
||||
import collections
|
||||
|
||||
import requests
|
||||
import tqdm
|
||||
|
||||
from flags2_common import main, save_flag, HTTPStatus, Result
|
||||
|
||||
|
||||
DEFAULT_CONCUR_REQ = 1
|
||||
MAX_CONCUR_REQ = 1
|
||||
|
||||
# BEGIN FLAGS2_BASIC_HTTP_FUNCTIONS
|
||||
def get_flag(base_url, cc):
|
||||
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
|
||||
resp = requests.get(url)
|
||||
if resp.status_code != 200: # <1>
|
||||
resp.raise_for_status()
|
||||
return resp.content
|
||||
|
||||
|
||||
def download_one(cc, base_url, verbose=False):
|
||||
try:
|
||||
image = get_flag(base_url, cc)
|
||||
except requests.exceptions.HTTPError as exc: # <2>
|
||||
res = exc.response
|
||||
if res.status_code == 404:
|
||||
status = HTTPStatus.not_found # <3>
|
||||
msg = 'not found'
|
||||
else: # <4>
|
||||
raise
|
||||
else:
|
||||
save_flag(image, cc.lower() + '.gif')
|
||||
status = HTTPStatus.ok
|
||||
msg = 'OK'
|
||||
|
||||
if verbose: # <5>
|
||||
print(cc, msg)
|
||||
|
||||
return Result(status, cc) # <6>
|
||||
# END FLAGS2_BASIC_HTTP_FUNCTIONS
|
||||
|
||||
# BEGIN FLAGS2_DOWNLOAD_MANY_SEQUENTIAL
|
||||
def download_many(cc_list, base_url, verbose, max_req):
|
||||
counter = collections.Counter() # <1>
|
||||
cc_iter = sorted(cc_list) # <2>
|
||||
if not verbose:
|
||||
cc_iter = tqdm.tqdm(cc_iter) # <3>
|
||||
for cc in cc_iter: # <4>
|
||||
try:
|
||||
res = download_one(cc, base_url, verbose) # <5>
|
||||
except requests.exceptions.HTTPError as exc: # <6>
|
||||
error_msg = 'HTTP error {res.status_code} - {res.reason}'
|
||||
error_msg = error_msg.format(res=exc.response)
|
||||
except requests.exceptions.ConnectionError as exc: # <7>
|
||||
error_msg = 'Connection error'
|
||||
else: # <8>
|
||||
error_msg = ''
|
||||
status = res.status
|
||||
|
||||
if error_msg:
|
||||
status = HTTPStatus.error # <9>
|
||||
counter[status] += 1 # <10>
|
||||
if verbose and error_msg: # <11>
|
||||
print('*** Error for {}: {}'.format(cc, error_msg))
|
||||
|
||||
return counter # <12>
|
||||
# END FLAGS2_DOWNLOAD_MANY_SEQUENTIAL
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
|
||||
68
17-futures/countries/flags2_threadpool.py
Normal file
68
17-futures/countries/flags2_threadpool.py
Normal file
@@ -0,0 +1,68 @@
|
||||
"""Download flags of countries (with error handling).
|
||||
|
||||
ThreadPool version
|
||||
|
||||
Sample run::
|
||||
|
||||
$ python3 flags2_threadpool.py -s ERROR -e
|
||||
ERROR site: http://localhost:8003/flags
|
||||
Searching for 676 flags: from AA to ZZ
|
||||
30 concurrent connections will be used.
|
||||
--------------------
|
||||
150 flags downloaded.
|
||||
361 not found.
|
||||
165 errors.
|
||||
Elapsed time: 7.46s
|
||||
|
||||
"""
|
||||
|
||||
# BEGIN FLAGS2_THREADPOOL
|
||||
import collections
|
||||
from concurrent import futures
|
||||
|
||||
import requests
|
||||
import tqdm # <1>
|
||||
|
||||
from flags2_common import main, HTTPStatus # <2>
|
||||
from flags2_sequential import download_one # <3>
|
||||
|
||||
DEFAULT_CONCUR_REQ = 30 # <4>
|
||||
MAX_CONCUR_REQ = 1000 # <5>
|
||||
|
||||
|
||||
def download_many(cc_list, base_url, verbose, concur_req):
|
||||
counter = collections.Counter()
|
||||
with futures.ThreadPoolExecutor(concur_req) as executor: # <6>
|
||||
to_do_map = {} # <7>
|
||||
for cc in sorted(cc_list): # <8>
|
||||
future = executor.submit(download_one,
|
||||
cc, base_url, verbose) # <9>
|
||||
to_do_map[future] = cc # <10>
|
||||
done_iter = futures.as_completed(to_do_map) # <11>
|
||||
if not verbose:
|
||||
done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <12>
|
||||
for future in done_iter: # <13>
|
||||
try:
|
||||
res = future.result() # <14>
|
||||
except requests.exceptions.HTTPError as exc: # <15>
|
||||
error_msg = 'HTTP {res.status_code} - {res.reason}'
|
||||
error_msg = error_msg.format(res=exc.response)
|
||||
except requests.exceptions.ConnectionError as exc:
|
||||
error_msg = 'Connection error'
|
||||
else:
|
||||
error_msg = ''
|
||||
status = res.status
|
||||
|
||||
if error_msg:
|
||||
status = HTTPStatus.error
|
||||
counter[status] += 1
|
||||
if verbose and error_msg:
|
||||
cc = to_do_map[future] # <16>
|
||||
print('*** Error for {}: {}'.format(cc, error_msg))
|
||||
|
||||
return counter
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
|
||||
# END FLAGS2_THREADPOOL
|
||||
130
17-futures/countries/flags3_asyncio.py
Normal file
130
17-futures/countries/flags3_asyncio.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""Download flags of countries (with error handling).
|
||||
|
||||
asyncio version using thread pool to save files
|
||||
|
||||
Sample run::
|
||||
|
||||
$
|
||||
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
import tqdm
|
||||
|
||||
from flags2_common import main, HTTPStatus, Result, save_flag
|
||||
|
||||
# default set low to avoid errors from remote site, such as
|
||||
# 503 - Service Temporarily Unavailable
|
||||
DEFAULT_CONCUR_REQ = 5
|
||||
MAX_CONCUR_REQ = 1000
|
||||
|
||||
|
||||
class FetchError(Exception):
|
||||
def __init__(self, country_code):
|
||||
self.country_code = country_code
|
||||
|
||||
# BEGIN FLAGS3_ASYNCIO
|
||||
@asyncio.coroutine
|
||||
def http_get(url):
|
||||
res = yield from aiohttp.request('GET', url)
|
||||
if res.status == 200:
|
||||
ctype = res.headers.get('Content-type', '').lower()
|
||||
if 'json' in ctype or url.endswith('json'):
|
||||
data = yield from res.json() # <1>
|
||||
else:
|
||||
data = yield from res.read() # <2>
|
||||
return data
|
||||
|
||||
elif res.status == 404:
|
||||
raise web.HTTPNotFound()
|
||||
else:
|
||||
raise aiohttp.errors.HttpProcessingError(
|
||||
code=res.status, message=res.reason,
|
||||
headers=res.headers)
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def get_country(base_url, cc):
|
||||
url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
|
||||
metadata = yield from http_get(url) # <3>
|
||||
return metadata['country']
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def get_flag(base_url, cc):
|
||||
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
|
||||
return (yield from http_get(url)) # <4>
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def download_one(cc, base_url, semaphore, verbose):
|
||||
try:
|
||||
with (yield from semaphore): # <5>
|
||||
image = yield from get_flag(base_url, cc)
|
||||
with (yield from semaphore):
|
||||
country = yield from get_country(base_url, cc)
|
||||
except web.HTTPNotFound:
|
||||
status = HTTPStatus.not_found
|
||||
msg = 'not found'
|
||||
except Exception as exc:
|
||||
raise FetchError(cc) from exc
|
||||
else:
|
||||
country = country.replace(' ', '_')
|
||||
filename = '{}-{}.gif'.format(country, cc)
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_in_executor(None, save_flag, image, filename)
|
||||
status = HTTPStatus.ok
|
||||
msg = 'OK'
|
||||
|
||||
if verbose and msg:
|
||||
print(cc, msg)
|
||||
|
||||
return Result(status, cc)
|
||||
# END FLAGS3_ASYNCIO
|
||||
|
||||
@asyncio.coroutine
|
||||
def downloader_coro(cc_list, base_url, verbose, concur_req):
|
||||
counter = collections.Counter()
|
||||
semaphore = asyncio.Semaphore(concur_req)
|
||||
to_do = [download_one(cc, base_url, semaphore, verbose)
|
||||
for cc in sorted(cc_list)]
|
||||
|
||||
to_do_iter = asyncio.as_completed(to_do)
|
||||
if not verbose:
|
||||
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
|
||||
for future in to_do_iter:
|
||||
try:
|
||||
res = yield from future
|
||||
except FetchError as exc:
|
||||
country_code = exc.country_code
|
||||
try:
|
||||
error_msg = exc.__cause__.args[0]
|
||||
except IndexError:
|
||||
error_msg = exc.__cause__.__class__.__name__
|
||||
if verbose and error_msg:
|
||||
msg = '*** Error for {}: {}'
|
||||
print(msg.format(country_code, error_msg))
|
||||
status = HTTPStatus.error
|
||||
else:
|
||||
status = res.status
|
||||
|
||||
counter[status] += 1
|
||||
|
||||
return counter
|
||||
|
||||
|
||||
def download_many(cc_list, base_url, verbose, concur_req):
|
||||
loop = asyncio.get_event_loop()
|
||||
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
|
||||
counts = loop.run_until_complete(coro)
|
||||
loop.close()
|
||||
|
||||
return counts
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
|
||||
88
17-futures/countries/flags3_threadpool.py
Normal file
88
17-futures/countries/flags3_threadpool.py
Normal file
@@ -0,0 +1,88 @@
|
||||
"""Download flags and names of countries.
|
||||
|
||||
ThreadPool version
|
||||
|
||||
Sample run::
|
||||
|
||||
|
||||
"""
|
||||
|
||||
import collections
|
||||
from concurrent import futures
|
||||
|
||||
import requests
|
||||
import tqdm
|
||||
|
||||
from flags2_common import main, save_flag, HTTPStatus, Result
|
||||
from flags2_sequential import get_flag
|
||||
|
||||
DEFAULT_CONCUR_REQ = 30
|
||||
MAX_CONCUR_REQ = 1000
|
||||
|
||||
|
||||
def get_country(base_url, cc):
|
||||
url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
|
||||
res = requests.get(url)
|
||||
if res.status_code != 200:
|
||||
res.raise_for_status()
|
||||
return res.json()['country']
|
||||
|
||||
|
||||
def download_one(cc, base_url, verbose=False):
|
||||
try:
|
||||
image = get_flag(base_url, cc)
|
||||
country = get_country(base_url, cc)
|
||||
except requests.exceptions.HTTPError as exc:
|
||||
res = exc.response
|
||||
if res.status_code == 404:
|
||||
status = HTTPStatus.not_found
|
||||
msg = 'not found'
|
||||
else: # <4>
|
||||
raise
|
||||
else:
|
||||
country = country.replace(' ', '_')
|
||||
save_flag(image, '{}-{}.gif'.format(country, cc))
|
||||
status = HTTPStatus.ok
|
||||
msg = 'OK'
|
||||
|
||||
if verbose:
|
||||
print(cc, msg)
|
||||
|
||||
return Result(status, cc)
|
||||
|
||||
|
||||
def download_many(cc_list, base_url, verbose, concur_req):
|
||||
counter = collections.Counter()
|
||||
with futures.ThreadPoolExecutor(concur_req) as executor:
|
||||
to_do_map = {}
|
||||
for cc in sorted(cc_list):
|
||||
future = executor.submit(download_one,
|
||||
cc, base_url, verbose)
|
||||
to_do_map[future] = cc
|
||||
to_do_iter = futures.as_completed(to_do_map)
|
||||
if not verbose:
|
||||
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
|
||||
for future in to_do_iter:
|
||||
try:
|
||||
res = future.result()
|
||||
except requests.exceptions.HTTPError as exc:
|
||||
error_msg = 'HTTP {res.status_code} - {res.reason}'
|
||||
error_msg = error_msg.format(res=exc.response)
|
||||
except requests.exceptions.ConnectionError as exc:
|
||||
error_msg = 'Connection error'
|
||||
else:
|
||||
error_msg = ''
|
||||
status = res.status
|
||||
|
||||
if error_msg:
|
||||
status = HTTPStatus.error
|
||||
counter[status] += 1
|
||||
if verbose and error_msg:
|
||||
cc = to_do_map[future]
|
||||
print('*** Error for {}: {}'.format(cc, error_msg))
|
||||
|
||||
return counter
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
|
||||
48
17-futures/countries/flags_asyncio.py
Normal file
48
17-futures/countries/flags_asyncio.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Download flags of top 20 countries by population
|
||||
|
||||
asyncio + aiottp version
|
||||
|
||||
Sample run::
|
||||
|
||||
$ python3 flags_asyncio.py
|
||||
EG VN IN TR RU ID US DE CN MX JP BD NG ET FR BR PH PK CD IR
|
||||
20 flags downloaded in 1.07s
|
||||
|
||||
"""
|
||||
# BEGIN FLAGS_ASYNCIO
|
||||
import asyncio
|
||||
|
||||
import aiohttp # <1>
|
||||
|
||||
from flags import BASE_URL, save_flag, show, main # <2>
|
||||
|
||||
|
||||
@asyncio.coroutine # <3>
|
||||
def get_flag(cc):
|
||||
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
|
||||
resp = yield from aiohttp.request('GET', url) # <4>
|
||||
image = yield from resp.read() # <5>
|
||||
return image
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def download_one(cc): # <6>
|
||||
image = yield from get_flag(cc) # <7>
|
||||
show(cc)
|
||||
save_flag(image, cc.lower() + '.gif')
|
||||
return cc
|
||||
|
||||
|
||||
def download_many(cc_list):
|
||||
loop = asyncio.get_event_loop() # <8>
|
||||
to_do = [download_one(cc) for cc in sorted(cc_list)] # <9>
|
||||
wait_coro = asyncio.wait(to_do) # <10>
|
||||
res, _ = loop.run_until_complete(wait_coro) # <11>
|
||||
loop.close() # <12>
|
||||
|
||||
return len(res)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many)
|
||||
# END FLAGS_ASYNCIO
|
||||
43
17-futures/countries/flags_threadpool.py
Normal file
43
17-futures/countries/flags_threadpool.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Download flags of top 20 countries by population
|
||||
|
||||
ThreadPoolExecutor version
|
||||
|
||||
Sample run::
|
||||
|
||||
$ python3 flags_threadpool.py
|
||||
BD retrieved.
|
||||
EG retrieved.
|
||||
CN retrieved.
|
||||
...
|
||||
PH retrieved.
|
||||
US retrieved.
|
||||
IR retrieved.
|
||||
20 flags downloaded in 0.93s
|
||||
|
||||
"""
|
||||
# BEGIN FLAGS_THREADPOOL
|
||||
from concurrent import futures
|
||||
|
||||
from flags import save_flag, get_flag, show, main # <1>
|
||||
|
||||
MAX_WORKERS = 20 # <2>
|
||||
|
||||
|
||||
def download_one(cc): # <3>
|
||||
image = get_flag(cc)
|
||||
show(cc)
|
||||
save_flag(image, cc.lower() + '.gif')
|
||||
return cc
|
||||
|
||||
|
||||
def download_many(cc_list):
|
||||
workers = min(MAX_WORKERS, len(cc_list)) # <4>
|
||||
with futures.ThreadPoolExecutor(workers) as executor: # <5>
|
||||
res = executor.map(download_one, sorted(cc_list)) # <6>
|
||||
|
||||
return len(list(res)) # <7>
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many) # <8>
|
||||
# END FLAGS_THREADPOOL
|
||||
55
17-futures/countries/flags_threadpool_ac.py
Normal file
55
17-futures/countries/flags_threadpool_ac.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""Download flags of top 20 countries by population
|
||||
|
||||
ThreadPoolExecutor version 2, with ``as_completed``.
|
||||
|
||||
Sample run::
|
||||
|
||||
$ python3 flags_threadpool.py
|
||||
BD retrieved.
|
||||
EG retrieved.
|
||||
CN retrieved.
|
||||
...
|
||||
PH retrieved.
|
||||
US retrieved.
|
||||
IR retrieved.
|
||||
20 flags downloaded in 0.93s
|
||||
|
||||
"""
|
||||
from concurrent import futures
|
||||
|
||||
from flags import save_flag, get_flag, show, main
|
||||
|
||||
MAX_WORKERS = 20
|
||||
|
||||
|
||||
def download_one(cc):
|
||||
image = get_flag(cc)
|
||||
show(cc)
|
||||
save_flag(image, cc.lower() + '.gif')
|
||||
return cc
|
||||
|
||||
|
||||
# BEGIN FLAGS_THREADPOOL_AS_COMPLETED
|
||||
def download_many(cc_list):
|
||||
cc_list = cc_list[:5] # <1>
|
||||
with futures.ThreadPoolExecutor(max_workers=3) as executor: # <2>
|
||||
to_do = []
|
||||
for cc in sorted(cc_list): # <3>
|
||||
future = executor.submit(download_one, cc) # <4>
|
||||
to_do.append(future) # <5>
|
||||
msg = 'Scheduled for {}: {}'
|
||||
print(msg.format(cc, future)) # <6>
|
||||
|
||||
results = []
|
||||
for future in futures.as_completed(to_do): # <7>
|
||||
res = future.result() # <8>
|
||||
msg = '{} result: {!r}'
|
||||
print(msg.format(future, res)) # <9>
|
||||
results.append(res)
|
||||
|
||||
return len(results)
|
||||
# END FLAGS_THREADPOOL_AS_COMPLETED
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(download_many)
|
||||
|
||||
4
17-futures/countries/vaurien_delay.sh
Executable file
4
17-futures/countries/vaurien_delay.sh
Executable file
@@ -0,0 +1,4 @@
|
||||
#!/bin/bash
|
||||
vaurien --protocol http --backend localhost:8001 \
|
||||
--proxy localhost:8002 \
|
||||
--behavior 100:delay --behavior-delay-sleep .5
|
||||
4
17-futures/countries/vaurien_error_delay.sh
Executable file
4
17-futures/countries/vaurien_error_delay.sh
Executable file
@@ -0,0 +1,4 @@
|
||||
#!/bin/bash
|
||||
vaurien --protocol http --backend localhost:8001 \
|
||||
--proxy localhost:8003 \
|
||||
--behavior 25:error,50:delay --behavior-delay-sleep .5
|
||||
34
17-futures/demo_executor_map.py
Normal file
34
17-futures/demo_executor_map.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""
|
||||
Experiment with ``ThreadPoolExecutor.map``
|
||||
"""
|
||||
# BEGIN EXECUTOR_MAP
|
||||
from time import sleep, strftime
|
||||
from concurrent import futures
|
||||
|
||||
|
||||
def display(*args): # <1>
|
||||
print(strftime('[%H:%M:%S]'), end=' ')
|
||||
print(*args)
|
||||
|
||||
|
||||
def loiter(n): # <2>
|
||||
msg = '{}loiter({}): doing nothing for {}s...'
|
||||
display(msg.format('\t'*n, n, n))
|
||||
sleep(n)
|
||||
msg = '{}loiter({}): done.'
|
||||
display(msg.format('\t'*n, n))
|
||||
return n * 10 # <3>
|
||||
|
||||
|
||||
def main():
|
||||
display('Script starting.')
|
||||
executor = futures.ThreadPoolExecutor(max_workers=3) # <4>
|
||||
results = executor.map(loiter, range(5)) # <5>
|
||||
display('results:', results) # <6>.
|
||||
display('Waiting for individual results:')
|
||||
for i, result in enumerate(results): # <7>
|
||||
display('result {}: {}'.format(i, result))
|
||||
|
||||
|
||||
main()
|
||||
# END EXECUTOR_MAP
|
||||
Reference in New Issue
Block a user