diff --git a/17-futures-py3.7/countries/.gitignore b/17-futures-py3.7/countries/.gitignore new file mode 100644 index 0000000..8ea4ee7 --- /dev/null +++ b/17-futures-py3.7/countries/.gitignore @@ -0,0 +1 @@ +flags/ diff --git a/17-futures-py3.7/countries/README.rst b/17-futures-py3.7/countries/README.rst new file mode 100644 index 0000000..0f29b01 --- /dev/null +++ b/17-futures-py3.7/countries/README.rst @@ -0,0 +1,178 @@ +============================ +Setting up Nginx and Vaurien +============================ + +This text explains how to configure Nginx and Vaurien to test HTTP client code while avoiding network traffic and introducing simulated delays and errors. This setup is necessary if you want to experiment with the ``flags2*.py`` image download examples in this directory -- covered in chapters 17 and 18 of Fluent Python. + + +Overview +======== + +The flag download examples are designed to compare the performance of different approaches to finding and downloading files from the Web. However, we don't want to hit a public server with multiple requests per second while testing, and we want to be able to simulate high latency and random network errors. + +For this setup I chose Nginx as the HTTP server because it is very fast and easy to configure, and Toxiproxy — designed by Shopify to introduce delays and network errors for testing distributed systems. + +The archive ``flags.zip``, contains a directory ``flags/`` with 194 subdirectories, each containing a ``.gif`` image and a ``metadata.json`` file. These are public-domain images copied from the `CIA World Fact Book `_. + +Once these files are unpacked to the ``flags/`` directory and Nginx is configured, you can experiment with the ``flags2*.py`` examples without hitting the network. + + +Procedure +========= + +1. Unpack test data +------------------- + +The instructions in this section are for GNU/Linux or OSX using the command line. Windows users should have no difficulty doing the same operations with the Windows Exporer GUI. + +Unpack the initial data in the ``fancy_flags/`` directory:: + + $ unzip flags.zip + ... many lines omitted ... + creating: flags/zw/ + inflating: flags/zw/metadata.json + inflating: flags/zw/zw.gif + + +Verify that 194 directories are created in ``fancy_flags/flags/``, each with a ``.gif`` and a ``metadata.json`` file:: + + + $ ls flags | wc -w + 194 + $ find flags | grep .gif | wc -l + 194 + $ find flags | grep .json | wc -l + 194 + $ ls flags/ad + ad.gif metadata.json + + +2. Install Nginx +---------------- + +Download and install Nginx. I used version 1.6.2 -- the latest stable version as I write this. + +* Download page: http://nginx.org/en/download.html + +* Beginner's guide: http://nginx.org/en/docs/beginners_guide.html + + +3. Configure Nginx +------------------ + +Edit the the ``nginx.conf`` file to set the port and document root. You can determine which ``nginx.conf`` is in use by running:: + + + $ nginx -V + + +The output starts with:: + + nginx version: nginx/1.6.2 + built by clang 6.0 (clang-600.0.51) (based on LLVM 3.5svn) + TLS SNI support enabled + configure arguments:... + + +Among the configure arguments you'll see ``--conf-path=``. That's the file you will edit. + +Most of the content in ``nginx.conf`` is within a block labeled ``http`` and enclosed in curly braces. Within that block there can be multiple blocks labeled ``server``. Add another ``server`` block like this one:: + + + server { + listen 8001; + + location /flags/ { + root /full-path-to.../countries/; + } + } + + +After editing ``nginx.conf`` the server must be started (if it's not running) or told to reload the configuration file:: + + + $ nginx # to start, if necessary + $ nginx -s reload # to reload the configuration + + +To test the configuration, open the URL http://localhost:8001/flags/ad/ad.gif in a browser. You should see the blue, yellow and red flag of Andorra. + +If the test fails, please double check the procedure just described and refer to the Nginx documentation. + +At this point you may run the ``flags_*2.py`` examples against the Nginx install by providing the ``--server LOCAL`` command line option. For example:: + + + $ python3 flags2_threadpool.py -s LOCAL + LOCAL site: http://localhost:8001/flags + Searching for 20 flags: from BD to VN + 20 concurrent connections will be used. + -------------------- + 20 flags downloaded. + Elapsed time: 0.09s + + +Note that Nginx is so fast that you will not see much difference in run time between the sequential and the concurrent versions. For more realistic testing with simulated network lag, we need to set up Toxiproxy. + + +4. Install and run Toxiproxy +---------------------------- + +Install... + +In one terminal: + + $ toxiproxy-server + +In another terminal: + + $ toxiproxy-cli create nginx_flags_delay -l localhost:8002 -u localhost:8001 + Created new proxy nginx_flags_delay + $ toxiproxy-cli toxic add nginx_flags_delay -t latency -a latency=500 + Added downstream latency toxic 'latency_downstream' on proxy 'nginx_flags_delay' + + +This creates an HTTP proxy on port 8002 which adds a 0.5s delay to every response. You can test it with a browser on port 8002: http://localhost:8002/flags/ad/ad.gif -- the flag of Andorra should appear after ½ second. + +TODO: UPDATE NEXT PARAGRAPH + +There is also the ``XXX`` script which runs a proxy on port 8003 producing errors in 25% of the responses and a .5 s delay to 50% of the responses. You can also test it with the browser on port 8003, but rememeber that errors are expected. + + +Platform-specific instructions +============================== + + +Nginx setup on Mac OS X +------------------------ + +Homebrew (copy & paste code at the bottom of http://brew.sh/):: + + + $ ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" + $ brew doctor + $ brew install nginx + + +Download and unpack:: + +Docroot is: /usr/local/var/www +/usr/local/etc/nginx/nginx.conf + + +:: + + To have launchd start nginx at login: + ln -sfv /usr/local/opt/nginx/*.plist ~/Library/LaunchAgents + Then to load nginx now: + launchctl load ~/Library/LaunchAgents/homebrew.mxcl.nginx.plist + Or, if you don't want/need launchctl, you can just run: + nginx + + + +Nginx setup on Lubuntu 14.04.1 LTS +---------------------------------- + +Docroot is: /usr/share/nginx/html + + diff --git a/17-futures-py3.7/countries/country_codes.txt b/17-futures-py3.7/countries/country_codes.txt new file mode 100644 index 0000000..72c37f0 --- /dev/null +++ b/17-futures-py3.7/countries/country_codes.txt @@ -0,0 +1,8 @@ +AD AE AF AG AL AM AO AR AT AU AZ BA BB BD BE BF BG BH BI BJ BN BO BR BS BT +BW BY BZ CA CD CF CG CH CI CL CM CN CO CR CU CV CY CZ DE DJ DK DM DZ EC EE +EG ER ES ET FI FJ FM FR GA GB GD GE GH GM GN GQ GR GT GW GY HN HR HT HU ID +IE IL IN IQ IR IS IT JM JO JP KE KG KH KI KM KN KP KR KW KZ LA LB LC LI LK +LR LS LT LU LV LY MA MC MD ME MG MH MK ML MM MN MR MT MU MV MW MX MY MZ NA +NE NG NI NL NO NP NR NZ OM PA PE PG PH PK PL PT PW PY QA RO RS RU RW SA SB +SC SD SE SG SI SK SL SM SN SO SR SS ST SV SY SZ TD TG TH TJ TL TM TN TO TR +TT TV TW TZ UA UG US UY UZ VA VC VE VN VU WS YE ZA ZM ZW diff --git a/17-futures-py3.7/countries/flags2_asyncio.py b/17-futures-py3.7/countries/flags2_asyncio.py new file mode 100644 index 0000000..2635155 --- /dev/null +++ b/17-futures-py3.7/countries/flags2_asyncio.py @@ -0,0 +1,103 @@ +"""Download flags of countries (with error handling). + +asyncio async/await version + +""" +# BEGIN FLAGS2_ASYNCIO_TOP +import asyncio +import collections + +import aiohttp +from aiohttp import web +import tqdm + +from flags2_common import main, HTTPStatus, Result, save_flag + +# default set low to avoid errors from remote site, such as +# 503 - Service Temporarily Unavailable +DEFAULT_CONCUR_REQ = 5 +MAX_CONCUR_REQ = 1000 + + +class FetchError(Exception): # <1> + def __init__(self, country_code): + self.country_code = country_code + + +async def get_flag(session, base_url, cc): # <2> + url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) + async with session.get(url) as resp: + if resp.status == 200: + return await resp.read() + elif resp.status == 404: + raise web.HTTPNotFound() + else: + raise aiohttp.HttpProcessingError( + code=resp.status, message=resp.reason, + headers=resp.headers) + + +async def download_one(session, cc, base_url, semaphore, verbose): # <3> + try: + async with semaphore: # <4> + image = await get_flag(session, base_url, cc) # <5> + except web.HTTPNotFound: # <6> + status = HTTPStatus.not_found + msg = 'not found' + except Exception as exc: + raise FetchError(cc) from exc # <7> + else: + save_flag(image, cc.lower() + '.gif') # <8> + status = HTTPStatus.ok + msg = 'OK' + + if verbose and msg: + print(cc, msg) + + return Result(status, cc) +# END FLAGS2_ASYNCIO_TOP + +# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY +async def downloader_coro(cc_list, base_url, verbose, concur_req): # <1> + counter = collections.Counter() + semaphore = asyncio.Semaphore(concur_req) # <2> + async with aiohttp.ClientSession() as session: # <8> + to_do = [download_one(session, cc, base_url, semaphore, verbose) + for cc in sorted(cc_list)] # <3> + + to_do_iter = asyncio.as_completed(to_do) # <4> + if not verbose: + to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5> + for future in to_do_iter: # <6> + try: + res = await future # <7> + except FetchError as exc: # <8> + country_code = exc.country_code # <9> + try: + error_msg = exc.__cause__.args[0] # <10> + except IndexError: + error_msg = exc.__cause__.__class__.__name__ # <11> + if verbose and error_msg: + msg = '*** Error for {}: {}' + print(msg.format(country_code, error_msg)) + status = HTTPStatus.error + else: + status = res.status + + counter[status] += 1 # <12> + + return counter # <13> + + +def download_many(cc_list, base_url, verbose, concur_req): + loop = asyncio.get_event_loop() + coro = downloader_coro(cc_list, base_url, verbose, concur_req) + counts = loop.run_until_complete(coro) # <14> + loop.close() # <15> + + return counts + + +if __name__ == '__main__': + main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ) +# END FLAGS2_ASYNCIO_DOWNLOAD_MANY diff --git a/17-futures-py3.7/countries/flags2_common.py b/17-futures-py3.7/countries/flags2_common.py new file mode 100644 index 0000000..bfa40fb --- /dev/null +++ b/17-futures-py3.7/countries/flags2_common.py @@ -0,0 +1,149 @@ +"""Utilities for second set of flag examples. +""" + +import os +import time +import sys +import string +import argparse +from collections import namedtuple +from enum import Enum + + +Result = namedtuple('Result', 'status data') + +HTTPStatus = Enum('Status', 'ok not_found error') + +POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' + 'MX PH VN ET EG DE IR TR CD FR').split() + +DEFAULT_CONCUR_REQ = 1 +MAX_CONCUR_REQ = 1 + +SERVERS = { + 'REMOTE': 'http://flupy.org/data/flags', + 'LOCAL': 'http://localhost:8001/flags', + 'DELAY': 'http://localhost:8002/flags', + 'ERROR': 'http://localhost:8003/flags', +} +DEFAULT_SERVER = 'LOCAL' + +DEST_DIR = 'downloads/' +COUNTRY_CODES_FILE = 'country_codes.txt' + + +def save_flag(img, filename): + path = os.path.join(DEST_DIR, filename) + with open(path, 'wb') as fp: + fp.write(img) + + +def initial_report(cc_list, actual_req, server_label): + if len(cc_list) <= 10: + cc_msg = ', '.join(cc_list) + else: + cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1]) + print('{} site: {}'.format(server_label, SERVERS[server_label])) + msg = 'Searching for {} flag{}: {}' + plural = 's' if len(cc_list) != 1 else '' + print(msg.format(len(cc_list), plural, cc_msg)) + plural = 's' if actual_req != 1 else '' + msg = '{} concurrent connection{} will be used.' + print(msg.format(actual_req, plural)) + + +def final_report(cc_list, counter, start_time): + elapsed = time.time() - start_time + print('-' * 20) + msg = '{} flag{} downloaded.' + plural = 's' if counter[HTTPStatus.ok] != 1 else '' + print(msg.format(counter[HTTPStatus.ok], plural)) + if counter[HTTPStatus.not_found]: + print(counter[HTTPStatus.not_found], 'not found.') + if counter[HTTPStatus.error]: + plural = 's' if counter[HTTPStatus.error] != 1 else '' + print('{} error{}.'.format(counter[HTTPStatus.error], plural)) + print('Elapsed time: {:.2f}s'.format(elapsed)) + + +def expand_cc_args(every_cc, all_cc, cc_args, limit): + codes = set() + A_Z = string.ascii_uppercase + if every_cc: + codes.update(a+b for a in A_Z for b in A_Z) + elif all_cc: + with open(COUNTRY_CODES_FILE) as fp: + text = fp.read() + codes.update(text.split()) + else: + for cc in (c.upper() for c in cc_args): + if len(cc) == 1 and cc in A_Z: + codes.update(cc+c for c in A_Z) + elif len(cc) == 2 and all(c in A_Z for c in cc): + codes.add(cc) + else: + msg = 'each CC argument must be A to Z or AA to ZZ.' + raise ValueError('*** Usage error: '+msg) + return sorted(codes)[:limit] + + +def process_args(default_concur_req): + server_options = ', '.join(sorted(SERVERS)) + parser = argparse.ArgumentParser( + description='Download flags for country codes. ' + 'Default: top 20 countries by population.') + parser.add_argument('cc', metavar='CC', nargs='*', + help='country code or 1st letter (eg. B for BA...BZ)') + parser.add_argument('-a', '--all', action='store_true', + help='get all available flags (AD to ZW)') + parser.add_argument('-e', '--every', action='store_true', + help='get flags for every possible code (AA...ZZ)') + parser.add_argument('-l', '--limit', metavar='N', type=int, + help='limit to N first codes', default=sys.maxsize) + parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int, + default=default_concur_req, + help='maximum concurrent requests (default={})' + .format(default_concur_req)) + parser.add_argument('-s', '--server', metavar='LABEL', + default=DEFAULT_SERVER, + help='Server to hit; one of {} (default={})' + .format(server_options, DEFAULT_SERVER)) + parser.add_argument('-v', '--verbose', action='store_true', + help='output detailed progress info') + args = parser.parse_args() + if args.max_req < 1: + print('*** Usage error: --max_req CONCURRENT must be >= 1') + parser.print_usage() + sys.exit(1) + if args.limit < 1: + print('*** Usage error: --limit N must be >= 1') + parser.print_usage() + sys.exit(1) + args.server = args.server.upper() + if args.server not in SERVERS: + print('*** Usage error: --server LABEL must be one of', + server_options) + parser.print_usage() + sys.exit(1) + try: + cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit) + except ValueError as exc: + print(exc.args[0]) + parser.print_usage() + sys.exit(1) + + if not cc_list: + cc_list = sorted(POP20_CC) + return args, cc_list + + +def main(download_many, default_concur_req, max_concur_req): + args, cc_list = process_args(default_concur_req) + actual_req = min(args.max_req, max_concur_req, len(cc_list)) + initial_report(cc_list, actual_req, args.server) + base_url = SERVERS[args.server] + t0 = time.time() + counter = download_many(cc_list, base_url, args.verbose, actual_req) + assert sum(counter.values()) == len(cc_list), \ + 'some downloads are unaccounted for' + final_report(cc_list, counter, t0) diff --git a/17-futures-py3.7/countries/flags2_sequential.py b/17-futures-py3.7/countries/flags2_sequential.py new file mode 100644 index 0000000..65a7e43 --- /dev/null +++ b/17-futures-py3.7/countries/flags2_sequential.py @@ -0,0 +1,87 @@ +"""Download flags of countries (with error handling). + +Sequential version + +Sample run:: + + $ python3 flags2_sequential.py -s DELAY b + DELAY site: http://localhost:8002/flags + Searching for 26 flags: from BA to BZ + 1 concurrent connection will be used. + -------------------- + 17 flags downloaded. + 9 not found. + Elapsed time: 13.36s + +""" + +import collections + +import requests +import tqdm + +from flags2_common import main, save_flag, HTTPStatus, Result + + +DEFAULT_CONCUR_REQ = 1 +MAX_CONCUR_REQ = 1 + +# BEGIN FLAGS2_BASIC_HTTP_FUNCTIONS +def get_flag(base_url, cc): + url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) + resp = requests.get(url) + if resp.status_code != 200: # <1> + resp.raise_for_status() + return resp.content + + +def download_one(cc, base_url, verbose=False): + try: + image = get_flag(base_url, cc) + except requests.exceptions.HTTPError as exc: # <2> + res = exc.response + if res.status_code == 404: + status = HTTPStatus.not_found # <3> + msg = 'not found' + else: # <4> + raise + else: + save_flag(image, cc.lower() + '.gif') + status = HTTPStatus.ok + msg = 'OK' + + if verbose: # <5> + print(cc, msg) + + return Result(status, cc) # <6> +# END FLAGS2_BASIC_HTTP_FUNCTIONS + +# BEGIN FLAGS2_DOWNLOAD_MANY_SEQUENTIAL +def download_many(cc_list, base_url, verbose, max_req): + counter = collections.Counter() # <1> + cc_iter = sorted(cc_list) # <2> + if not verbose: + cc_iter = tqdm.tqdm(cc_iter) # <3> + for cc in cc_iter: # <4> + try: + res = download_one(cc, base_url, verbose) # <5> + except requests.exceptions.HTTPError as exc: # <6> + error_msg = 'HTTP error {res.status_code} - {res.reason}' + error_msg = error_msg.format(res=exc.response) + except requests.exceptions.ConnectionError as exc: # <7> + error_msg = 'Connection error' + else: # <8> + error_msg = '' + status = res.status + + if error_msg: + status = HTTPStatus.error # <9> + counter[status] += 1 # <10> + if verbose and error_msg: # <11> + print('*** Error for {}: {}'.format(cc, error_msg)) + + return counter # <12> +# END FLAGS2_DOWNLOAD_MANY_SEQUENTIAL + +if __name__ == '__main__': + main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ) diff --git a/17-futures-py3.7/countries/flags2_threadpool.py b/17-futures-py3.7/countries/flags2_threadpool.py new file mode 100644 index 0000000..069d4ff --- /dev/null +++ b/17-futures-py3.7/countries/flags2_threadpool.py @@ -0,0 +1,68 @@ +"""Download flags of countries (with error handling). + +ThreadPool version + +Sample run:: + + $ python3 flags2_threadpool.py -s ERROR -e + ERROR site: http://localhost:8003/flags + Searching for 676 flags: from AA to ZZ + 30 concurrent connections will be used. + -------------------- + 150 flags downloaded. + 361 not found. + 165 errors. + Elapsed time: 7.46s + +""" + +# BEGIN FLAGS2_THREADPOOL +import collections +from concurrent import futures + +import requests +import tqdm # <1> + +from flags2_common import main, HTTPStatus # <2> +from flags2_sequential import download_one # <3> + +DEFAULT_CONCUR_REQ = 30 # <4> +MAX_CONCUR_REQ = 1000 # <5> + + +def download_many(cc_list, base_url, verbose, concur_req): + counter = collections.Counter() + with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: # <6> + to_do_map = {} # <7> + for cc in sorted(cc_list): # <8> + future = executor.submit(download_one, + cc, base_url, verbose) # <9> + to_do_map[future] = cc # <10> + done_iter = futures.as_completed(to_do_map) # <11> + if not verbose: + done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <12> + for future in done_iter: # <13> + try: + res = future.result() # <14> + except requests.exceptions.HTTPError as exc: # <15> + error_msg = 'HTTP {res.status_code} - {res.reason}' + error_msg = error_msg.format(res=exc.response) + except requests.exceptions.ConnectionError as exc: + error_msg = 'Connection error' + else: + error_msg = '' + status = res.status + + if error_msg: + status = HTTPStatus.error + counter[status] += 1 + if verbose and error_msg: + cc = to_do_map[future] # <16> + print('*** Error for {}: {}'.format(cc, error_msg)) + + return counter + + +if __name__ == '__main__': + main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ) +# END FLAGS2_THREADPOOL diff --git a/17-futures-py3.7/countries/asyncio_flags.py b/17-futures-py3.7/countries/flags_asyncio.py similarity index 100% rename from 17-futures-py3.7/countries/asyncio_flags.py rename to 17-futures-py3.7/countries/flags_asyncio.py diff --git a/17-futures-py3.7/countries/threadpool_flags.py b/17-futures-py3.7/countries/flags_threadpool.py similarity index 100% rename from 17-futures-py3.7/countries/threadpool_flags.py rename to 17-futures-py3.7/countries/flags_threadpool.py