updated more ch. 17 examples to Python 3.7

This commit is contained in:
Luciano Ramalho 2019-01-23 22:19:17 -02:00
parent 176dc38c42
commit dd164484a3
9 changed files with 594 additions and 0 deletions

1
17-futures-py3.7/countries/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
flags/

View File

@ -0,0 +1,178 @@
============================
Setting up Nginx and Vaurien
============================
This text explains how to configure Nginx and Vaurien to test HTTP client code while avoiding network traffic and introducing simulated delays and errors. This setup is necessary if you want to experiment with the ``flags2*.py`` image download examples in this directory -- covered in chapters 17 and 18 of Fluent Python.
Overview
========
The flag download examples are designed to compare the performance of different approaches to finding and downloading files from the Web. However, we don't want to hit a public server with multiple requests per second while testing, and we want to be able to simulate high latency and random network errors.
For this setup I chose Nginx as the HTTP server because it is very fast and easy to configure, and Toxiproxy — designed by Shopify to introduce delays and network errors for testing distributed systems.
The archive ``flags.zip``, contains a directory ``flags/`` with 194 subdirectories, each containing a ``.gif`` image and a ``metadata.json`` file. These are public-domain images copied from the `CIA World Fact Book <https://www.cia.gov/library/publications/the-world-factbook/>`_.
Once these files are unpacked to the ``flags/`` directory and Nginx is configured, you can experiment with the ``flags2*.py`` examples without hitting the network.
Procedure
=========
1. Unpack test data
-------------------
The instructions in this section are for GNU/Linux or OSX using the command line. Windows users should have no difficulty doing the same operations with the Windows Exporer GUI.
Unpack the initial data in the ``fancy_flags/`` directory::
$ unzip flags.zip
... many lines omitted ...
creating: flags/zw/
inflating: flags/zw/metadata.json
inflating: flags/zw/zw.gif
Verify that 194 directories are created in ``fancy_flags/flags/``, each with a ``.gif`` and a ``metadata.json`` file::
$ ls flags | wc -w
194
$ find flags | grep .gif | wc -l
194
$ find flags | grep .json | wc -l
194
$ ls flags/ad
ad.gif metadata.json
2. Install Nginx
----------------
Download and install Nginx. I used version 1.6.2 -- the latest stable version as I write this.
* Download page: http://nginx.org/en/download.html
* Beginner's guide: http://nginx.org/en/docs/beginners_guide.html
3. Configure Nginx
------------------
Edit the the ``nginx.conf`` file to set the port and document root. You can determine which ``nginx.conf`` is in use by running::
$ nginx -V
The output starts with::
nginx version: nginx/1.6.2
built by clang 6.0 (clang-600.0.51) (based on LLVM 3.5svn)
TLS SNI support enabled
configure arguments:...
Among the configure arguments you'll see ``--conf-path=``. That's the file you will edit.
Most of the content in ``nginx.conf`` is within a block labeled ``http`` and enclosed in curly braces. Within that block there can be multiple blocks labeled ``server``. Add another ``server`` block like this one::
server {
listen 8001;
location /flags/ {
root /full-path-to.../countries/;
}
}
After editing ``nginx.conf`` the server must be started (if it's not running) or told to reload the configuration file::
$ nginx # to start, if necessary
$ nginx -s reload # to reload the configuration
To test the configuration, open the URL http://localhost:8001/flags/ad/ad.gif in a browser. You should see the blue, yellow and red flag of Andorra.
If the test fails, please double check the procedure just described and refer to the Nginx documentation.
At this point you may run the ``flags_*2.py`` examples against the Nginx install by providing the ``--server LOCAL`` command line option. For example::
$ python3 flags2_threadpool.py -s LOCAL
LOCAL site: http://localhost:8001/flags
Searching for 20 flags: from BD to VN
20 concurrent connections will be used.
--------------------
20 flags downloaded.
Elapsed time: 0.09s
Note that Nginx is so fast that you will not see much difference in run time between the sequential and the concurrent versions. For more realistic testing with simulated network lag, we need to set up Toxiproxy.
4. Install and run Toxiproxy
----------------------------
Install...
In one terminal:
$ toxiproxy-server
In another terminal:
$ toxiproxy-cli create nginx_flags_delay -l localhost:8002 -u localhost:8001
Created new proxy nginx_flags_delay
$ toxiproxy-cli toxic add nginx_flags_delay -t latency -a latency=500
Added downstream latency toxic 'latency_downstream' on proxy 'nginx_flags_delay'
This creates an HTTP proxy on port 8002 which adds a 0.5s delay to every response. You can test it with a browser on port 8002: http://localhost:8002/flags/ad/ad.gif -- the flag of Andorra should appear after ½ second.
TODO: UPDATE NEXT PARAGRAPH
There is also the ``XXX`` script which runs a proxy on port 8003 producing errors in 25% of the responses and a .5 s delay to 50% of the responses. You can also test it with the browser on port 8003, but rememeber that errors are expected.
Platform-specific instructions
==============================
Nginx setup on Mac OS X
------------------------
Homebrew (copy & paste code at the bottom of http://brew.sh/)::
$ ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
$ brew doctor
$ brew install nginx
Download and unpack::
Docroot is: /usr/local/var/www
/usr/local/etc/nginx/nginx.conf
::
To have launchd start nginx at login:
ln -sfv /usr/local/opt/nginx/*.plist ~/Library/LaunchAgents
Then to load nginx now:
launchctl load ~/Library/LaunchAgents/homebrew.mxcl.nginx.plist
Or, if you don't want/need launchctl, you can just run:
nginx
Nginx setup on Lubuntu 14.04.1 LTS
----------------------------------
Docroot is: /usr/share/nginx/html

View File

@ -0,0 +1,8 @@
AD AE AF AG AL AM AO AR AT AU AZ BA BB BD BE BF BG BH BI BJ BN BO BR BS BT
BW BY BZ CA CD CF CG CH CI CL CM CN CO CR CU CV CY CZ DE DJ DK DM DZ EC EE
EG ER ES ET FI FJ FM FR GA GB GD GE GH GM GN GQ GR GT GW GY HN HR HT HU ID
IE IL IN IQ IR IS IT JM JO JP KE KG KH KI KM KN KP KR KW KZ LA LB LC LI LK
LR LS LT LU LV LY MA MC MD ME MG MH MK ML MM MN MR MT MU MV MW MX MY MZ NA
NE NG NI NL NO NP NR NZ OM PA PE PG PH PK PL PT PW PY QA RO RS RU RW SA SB
SC SD SE SG SI SK SL SM SN SO SR SS ST SV SY SZ TD TG TH TJ TL TM TN TO TR
TT TV TW TZ UA UG US UY UZ VA VC VE VN VU WS YE ZA ZM ZW

View File

@ -0,0 +1,103 @@
"""Download flags of countries (with error handling).
asyncio async/await version
"""
# BEGIN FLAGS2_ASYNCIO_TOP
import asyncio
import collections
import aiohttp
from aiohttp import web
import tqdm
from flags2_common import main, HTTPStatus, Result, save_flag
# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
class FetchError(Exception): # <1>
def __init__(self, country_code):
self.country_code = country_code
async def get_flag(session, base_url, cc): # <2>
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
async with session.get(url) as resp:
if resp.status == 200:
return await resp.read()
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(
code=resp.status, message=resp.reason,
headers=resp.headers)
async def download_one(session, cc, base_url, semaphore, verbose): # <3>
try:
async with semaphore: # <4>
image = await get_flag(session, base_url, cc) # <5>
except web.HTTPNotFound: # <6>
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc # <7>
else:
save_flag(image, cc.lower() + '.gif') # <8>
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(status, cc)
# END FLAGS2_ASYNCIO_TOP
# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY
async def downloader_coro(cc_list, base_url, verbose, concur_req): # <1>
counter = collections.Counter()
semaphore = asyncio.Semaphore(concur_req) # <2>
async with aiohttp.ClientSession() as session: # <8>
to_do = [download_one(session, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)] # <3>
to_do_iter = asyncio.as_completed(to_do) # <4>
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
for future in to_do_iter: # <6>
try:
res = await future # <7>
except FetchError as exc: # <8>
country_code = exc.country_code # <9>
try:
error_msg = exc.__cause__.args[0] # <10>
except IndexError:
error_msg = exc.__cause__.__class__.__name__ # <11>
if verbose and error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1 # <12>
return counter # <13>
def download_many(cc_list, base_url, verbose, concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
counts = loop.run_until_complete(coro) # <14>
loop.close() # <15>
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# END FLAGS2_ASYNCIO_DOWNLOAD_MANY

View File

@ -0,0 +1,149 @@
"""Utilities for second set of flag examples.
"""
import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1
SERVERS = {
'REMOTE': 'http://flupy.org/data/flags',
'LOCAL': 'http://localhost:8001/flags',
'DELAY': 'http://localhost:8002/flags',
'ERROR': 'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'
DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'
def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def initial_report(cc_list, actual_req, server_label):
if len(cc_list) <= 10:
cc_msg = ', '.join(cc_list)
else:
cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
print('{} site: {}'.format(server_label, SERVERS[server_label]))
msg = 'Searching for {} flag{}: {}'
plural = 's' if len(cc_list) != 1 else ''
print(msg.format(len(cc_list), plural, cc_msg))
plural = 's' if actual_req != 1 else ''
msg = '{} concurrent connection{} will be used.'
print(msg.format(actual_req, plural))
def final_report(cc_list, counter, start_time):
elapsed = time.time() - start_time
print('-' * 20)
msg = '{} flag{} downloaded.'
plural = 's' if counter[HTTPStatus.ok] != 1 else ''
print(msg.format(counter[HTTPStatus.ok], plural))
if counter[HTTPStatus.not_found]:
print(counter[HTTPStatus.not_found], 'not found.')
if counter[HTTPStatus.error]:
plural = 's' if counter[HTTPStatus.error] != 1 else ''
print('{} error{}.'.format(counter[HTTPStatus.error], plural))
print('Elapsed time: {:.2f}s'.format(elapsed))
def expand_cc_args(every_cc, all_cc, cc_args, limit):
codes = set()
A_Z = string.ascii_uppercase
if every_cc:
codes.update(a+b for a in A_Z for b in A_Z)
elif all_cc:
with open(COUNTRY_CODES_FILE) as fp:
text = fp.read()
codes.update(text.split())
else:
for cc in (c.upper() for c in cc_args):
if len(cc) == 1 and cc in A_Z:
codes.update(cc+c for c in A_Z)
elif len(cc) == 2 and all(c in A_Z for c in cc):
codes.add(cc)
else:
msg = 'each CC argument must be A to Z or AA to ZZ.'
raise ValueError('*** Usage error: '+msg)
return sorted(codes)[:limit]
def process_args(default_concur_req):
server_options = ', '.join(sorted(SERVERS))
parser = argparse.ArgumentParser(
description='Download flags for country codes. '
'Default: top 20 countries by population.')
parser.add_argument('cc', metavar='CC', nargs='*',
help='country code or 1st letter (eg. B for BA...BZ)')
parser.add_argument('-a', '--all', action='store_true',
help='get all available flags (AD to ZW)')
parser.add_argument('-e', '--every', action='store_true',
help='get flags for every possible code (AA...ZZ)')
parser.add_argument('-l', '--limit', metavar='N', type=int,
help='limit to N first codes', default=sys.maxsize)
parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
default=default_concur_req,
help='maximum concurrent requests (default={})'
.format(default_concur_req))
parser.add_argument('-s', '--server', metavar='LABEL',
default=DEFAULT_SERVER,
help='Server to hit; one of {} (default={})'
.format(server_options, DEFAULT_SERVER))
parser.add_argument('-v', '--verbose', action='store_true',
help='output detailed progress info')
args = parser.parse_args()
if args.max_req < 1:
print('*** Usage error: --max_req CONCURRENT must be >= 1')
parser.print_usage()
sys.exit(1)
if args.limit < 1:
print('*** Usage error: --limit N must be >= 1')
parser.print_usage()
sys.exit(1)
args.server = args.server.upper()
if args.server not in SERVERS:
print('*** Usage error: --server LABEL must be one of',
server_options)
parser.print_usage()
sys.exit(1)
try:
cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
except ValueError as exc:
print(exc.args[0])
parser.print_usage()
sys.exit(1)
if not cc_list:
cc_list = sorted(POP20_CC)
return args, cc_list
def main(download_many, default_concur_req, max_concur_req):
args, cc_list = process_args(default_concur_req)
actual_req = min(args.max_req, max_concur_req, len(cc_list))
initial_report(cc_list, actual_req, args.server)
base_url = SERVERS[args.server]
t0 = time.time()
counter = download_many(cc_list, base_url, args.verbose, actual_req)
assert sum(counter.values()) == len(cc_list), \
'some downloads are unaccounted for'
final_report(cc_list, counter, t0)

View File

@ -0,0 +1,87 @@
"""Download flags of countries (with error handling).
Sequential version
Sample run::
$ python3 flags2_sequential.py -s DELAY b
DELAY site: http://localhost:8002/flags
Searching for 26 flags: from BA to BZ
1 concurrent connection will be used.
--------------------
17 flags downloaded.
9 not found.
Elapsed time: 13.36s
"""
import collections
import requests
import tqdm
from flags2_common import main, save_flag, HTTPStatus, Result
DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1
# BEGIN FLAGS2_BASIC_HTTP_FUNCTIONS
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = requests.get(url)
if resp.status_code != 200: # <1>
resp.raise_for_status()
return resp.content
def download_one(cc, base_url, verbose=False):
try:
image = get_flag(base_url, cc)
except requests.exceptions.HTTPError as exc: # <2>
res = exc.response
if res.status_code == 404:
status = HTTPStatus.not_found # <3>
msg = 'not found'
else: # <4>
raise
else:
save_flag(image, cc.lower() + '.gif')
status = HTTPStatus.ok
msg = 'OK'
if verbose: # <5>
print(cc, msg)
return Result(status, cc) # <6>
# END FLAGS2_BASIC_HTTP_FUNCTIONS
# BEGIN FLAGS2_DOWNLOAD_MANY_SEQUENTIAL
def download_many(cc_list, base_url, verbose, max_req):
counter = collections.Counter() # <1>
cc_iter = sorted(cc_list) # <2>
if not verbose:
cc_iter = tqdm.tqdm(cc_iter) # <3>
for cc in cc_iter: # <4>
try:
res = download_one(cc, base_url, verbose) # <5>
except requests.exceptions.HTTPError as exc: # <6>
error_msg = 'HTTP error {res.status_code} - {res.reason}'
error_msg = error_msg.format(res=exc.response)
except requests.exceptions.ConnectionError as exc: # <7>
error_msg = 'Connection error'
else: # <8>
error_msg = ''
status = res.status
if error_msg:
status = HTTPStatus.error # <9>
counter[status] += 1 # <10>
if verbose and error_msg: # <11>
print('*** Error for {}: {}'.format(cc, error_msg))
return counter # <12>
# END FLAGS2_DOWNLOAD_MANY_SEQUENTIAL
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

View File

@ -0,0 +1,68 @@
"""Download flags of countries (with error handling).
ThreadPool version
Sample run::
$ python3 flags2_threadpool.py -s ERROR -e
ERROR site: http://localhost:8003/flags
Searching for 676 flags: from AA to ZZ
30 concurrent connections will be used.
--------------------
150 flags downloaded.
361 not found.
165 errors.
Elapsed time: 7.46s
"""
# BEGIN FLAGS2_THREADPOOL
import collections
from concurrent import futures
import requests
import tqdm # <1>
from flags2_common import main, HTTPStatus # <2>
from flags2_sequential import download_one # <3>
DEFAULT_CONCUR_REQ = 30 # <4>
MAX_CONCUR_REQ = 1000 # <5>
def download_many(cc_list, base_url, verbose, concur_req):
counter = collections.Counter()
with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: # <6>
to_do_map = {} # <7>
for cc in sorted(cc_list): # <8>
future = executor.submit(download_one,
cc, base_url, verbose) # <9>
to_do_map[future] = cc # <10>
done_iter = futures.as_completed(to_do_map) # <11>
if not verbose:
done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <12>
for future in done_iter: # <13>
try:
res = future.result() # <14>
except requests.exceptions.HTTPError as exc: # <15>
error_msg = 'HTTP {res.status_code} - {res.reason}'
error_msg = error_msg.format(res=exc.response)
except requests.exceptions.ConnectionError as exc:
error_msg = 'Connection error'
else:
error_msg = ''
status = res.status
if error_msg:
status = HTTPStatus.error
counter[status] += 1
if verbose and error_msg:
cc = to_do_map[future] # <16>
print('*** Error for {}: {}'.format(cc, error_msg))
return counter
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# END FLAGS2_THREADPOOL