ch21 cleanup, f-strings, and pathlib

This commit is contained in:
Miroslav Šedivý
2021-02-13 19:11:54 +01:00
parent c5114a5679
commit 93dfeaeb80
11 changed files with 70 additions and 81 deletions

View File

@@ -1,14 +1,13 @@
"""Utilities for second set of flag examples.
"""
import os
import time
import sys
import string
import argparse
import string
import sys
import time
from collections import namedtuple, Counter
from enum import Enum
from pathlib import Path
Result = namedtuple('Result', 'status data')
@@ -28,14 +27,12 @@ SERVERS = {
}
DEFAULT_SERVER = 'LOCAL'
DEST_DIR = 'downloaded/'
COUNTRY_CODES_FILE = 'country_codes.txt'
DEST_DIR = Path('downloaded')
COUNTRY_CODES_FILE = Path('country_codes.txt')
def save_flag(img: bytes, filename: str) -> None:
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
(DEST_DIR / filename).write_bytes(img)
def initial_report(cc_list: list[str],
@@ -44,30 +41,27 @@ def initial_report(cc_list: list[str],
if len(cc_list) <= 10:
cc_msg = ', '.join(cc_list)
else:
cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
print('{} site: {}'.format(server_label, SERVERS[server_label]))
msg = 'Searching for {} flag{}: {}'
cc_msg = f'from {cc_list[0]} to {cc_list[-1]}'
print(f'{server_label} site: {SERVERS[server_label]}')
plural = 's' if len(cc_list) != 1 else ''
print(msg.format(len(cc_list), plural, cc_msg))
print(f'Searching for {len(cc_list)} flag{plural}: {cc_msg}')
plural = 's' if actual_req != 1 else ''
msg = '{} concurrent connection{} will be used.'
print(msg.format(actual_req, plural))
print(f'{actual_req} concurrent connection{plural} will be used.')
def final_report(cc_list: list[str],
counter: Counter[HTTPStatus],
start_time: float) -> None:
elapsed = time.time() - start_time
elapsed = time.perf_counter() - start_time
print('-' * 20)
msg = '{} flag{} downloaded.'
plural = 's' if counter[HTTPStatus.ok] != 1 else ''
print(msg.format(counter[HTTPStatus.ok], plural))
print(f'{counter[HTTPStatus.ok]} flag{plural} downloaded.')
if counter[HTTPStatus.not_found]:
print(counter[HTTPStatus.not_found], 'not found.')
print(f'{counter[HTTPStatus.not_found]} not found.')
if counter[HTTPStatus.error]:
plural = 's' if counter[HTTPStatus.error] != 1 else ''
print('{} error{}.'.format(counter[HTTPStatus.error], plural))
print('Elapsed time: {:.2f}s'.format(elapsed))
print(f'{counter[HTTPStatus.error]} error{plural}')
print(f'Elapsed time: {elapsed:.2f}s')
def expand_cc_args(every_cc: bool,
@@ -75,22 +69,21 @@ def expand_cc_args(every_cc: bool,
cc_args: list[str],
limit: int) -> list[str]:
codes: set[str] = set()
A_Z = string.ascii_uppercase
A_Z = set(string.ascii_uppercase)
if every_cc:
codes.update(a+b for a in A_Z for b in A_Z)
codes.update(f'{a}{b}' for a in A_Z for b in A_Z)
elif all_cc:
with open(COUNTRY_CODES_FILE) as fp:
text = fp.read()
text = COUNTRY_CODES_FILE.read_text()
codes.update(text.split())
else:
for cc in (c.upper() for c in cc_args):
if len(cc) == 1 and cc in A_Z:
codes.update(cc+c for c in A_Z)
codes.update(cc + c for c in A_Z)
elif len(cc) == 2 and all(c in A_Z for c in cc):
codes.add(cc)
else:
msg = 'each CC argument must be A to Z or AA to ZZ.'
raise ValueError('*** Usage error: '+msg)
raise ValueError('*** Usage error: each CC argument '
'must be A to Z or AA to ZZ.')
return sorted(codes)[:limit]
@@ -98,23 +91,29 @@ def process_args(default_concur_req):
server_options = ', '.join(sorted(SERVERS))
parser = argparse.ArgumentParser(
description='Download flags for country codes. '
'Default: top 20 countries by population.')
parser.add_argument('cc', metavar='CC', nargs='*',
'Default: top 20 countries by population.')
parser.add_argument(
'cc', metavar='CC', nargs='*',
help='country code or 1st letter (eg. B for BA...BZ)')
parser.add_argument('-a', '--all', action='store_true',
parser.add_argument(
'-a', '--all', action='store_true',
help='get all available flags (AD to ZW)')
parser.add_argument('-e', '--every', action='store_true',
parser.add_argument(
'-e', '--every', action='store_true',
help='get flags for every possible code (AA...ZZ)')
parser.add_argument('-l', '--limit', metavar='N', type=int,
help='limit to N first codes', default=sys.maxsize)
parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
parser.add_argument(
'-l', '--limit', metavar='N', type=int, help='limit to N first codes',
default=sys.maxsize)
parser.add_argument(
'-m', '--max_req', metavar='CONCURRENT', type=int,
default=default_concur_req,
help=f'maximum concurrent requests (default={default_concur_req})')
parser.add_argument('-s', '--server', metavar='LABEL',
default=DEFAULT_SERVER,
help=('Server to hit; one of ' +
f'{server_options} (default={DEFAULT_SERVER})'))
parser.add_argument('-v', '--verbose', action='store_true',
parser.add_argument(
'-s', '--server', metavar='LABEL', default=DEFAULT_SERVER,
help=f'Server to hit; one of {server_options} '
f'(default={DEFAULT_SERVER})')
parser.add_argument(
'-v', '--verbose', action='store_true',
help='output detailed progress info')
args = parser.parse_args()
if args.max_req < 1:
@@ -127,8 +126,8 @@ def process_args(default_concur_req):
sys.exit(1)
args.server = args.server.upper()
if args.server not in SERVERS:
print('*** Usage error: --server LABEL must be one of',
server_options)
print(f'*** Usage error: --server LABEL '
f'must be one of {server_options}')
parser.print_usage()
sys.exit(1)
try:
@@ -148,8 +147,9 @@ def main(download_many, default_concur_req, max_concur_req):
actual_req = min(args.max_req, max_concur_req, len(cc_list))
initial_report(cc_list, actual_req, args.server)
base_url = SERVERS[args.server]
t0 = time.time()
t0 = time.perf_counter()
counter = download_many(cc_list, base_url, args.verbose, actual_req)
assert sum(counter.values()) == len(cc_list), \
assert sum(counter.values()) == len(cc_list), (
'some downloads are unaccounted for'
)
final_report(cc_list, counter, t0)