sync from Atlas
This commit is contained in:
parent
5d6b156047
commit
43f1bf23b3
@ -31,46 +31,46 @@ def worker(jobs: JobQueue, results: ResultQueue) -> None: # <7>
|
||||
while n := jobs.get(): # <8>
|
||||
results.put(check(n)) # <9>
|
||||
results.put(PrimeResult(0, False, 0.0)) # <10>
|
||||
# end::PRIMES_PROC_TOP[]
|
||||
|
||||
# tag::PRIMES_PROC_MIDDLE[]
|
||||
def start_jobs(workers: int, jobs: JobQueue, results: ResultQueue) -> None:
|
||||
def start_jobs(
|
||||
procs: int, jobs: JobQueue, results: ResultQueue # <11>
|
||||
) -> None:
|
||||
for n in NUMBERS:
|
||||
jobs.put(n) # <1>
|
||||
for _ in range(workers):
|
||||
proc = Process(target=worker, args=(jobs, results)) # <2>
|
||||
proc.start() # <3>
|
||||
jobs.put(0) # <4>
|
||||
|
||||
def report(workers: int, results: ResultQueue) -> int:
|
||||
checked = 0
|
||||
workers_done = 0
|
||||
while workers_done < workers:
|
||||
n, prime, elapsed = results.get()
|
||||
if n == 0:
|
||||
workers_done += 1
|
||||
else:
|
||||
checked += 1
|
||||
label = 'P' if prime else ' '
|
||||
print(f'{n:16} {label} {elapsed:9.6f}s')
|
||||
return checked
|
||||
# end::PRIMES_PROC_MIDDLE[]
|
||||
jobs.put(n) # <12>
|
||||
for _ in range(procs):
|
||||
proc = Process(target=worker, args=(jobs, results)) # <13>
|
||||
proc.start() # <14>
|
||||
jobs.put(0) # <15>
|
||||
# end::PRIMES_PROC_TOP[]
|
||||
|
||||
# tag::PRIMES_PROC_MAIN[]
|
||||
def main() -> None:
|
||||
if len(sys.argv) < 2:
|
||||
workers = cpu_count()
|
||||
if len(sys.argv) < 2: # <1>
|
||||
procs = cpu_count()
|
||||
else:
|
||||
workers = int(sys.argv[1])
|
||||
procs = int(sys.argv[1])
|
||||
|
||||
print(f'Checking {len(NUMBERS)} numbers with {workers} processes:')
|
||||
print(f'Checking {len(NUMBERS)} numbers with {procs} processes:')
|
||||
t0 = perf_counter()
|
||||
jobs: JobQueue = SimpleQueue()
|
||||
jobs: JobQueue = SimpleQueue() # <2>
|
||||
results: ResultQueue = SimpleQueue()
|
||||
start_jobs(workers, jobs, results)
|
||||
checked = report(workers, results)
|
||||
start_jobs(procs, jobs, results) # <3>
|
||||
checked = report(procs, results) # <4>
|
||||
elapsed = perf_counter() - t0
|
||||
print(f'{checked} checks in {elapsed:.2f}s')
|
||||
print(f'{checked} checks in {elapsed:.2f}s') # <5>
|
||||
|
||||
def report(procs: int, results: ResultQueue) -> int: # <6>
|
||||
checked = 0
|
||||
procs_done = 0
|
||||
while procs_done < procs: # <7>
|
||||
n, prime, elapsed = results.get() # <8>
|
||||
if n == 0: # <9>
|
||||
procs_done += 1
|
||||
else:
|
||||
checked += 1 # <10>
|
||||
label = 'P' if prime else ' '
|
||||
print(f'{n:16} {label} {elapsed:9.6f}s')
|
||||
return checked
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
80
19-concurrency/primes/procs_race_condition.py
Executable file
80
19-concurrency/primes/procs_race_condition.py
Executable file
@ -0,0 +1,80 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
procs.py: shows that multiprocessing on a multicore machine
|
||||
can be faster than sequential code for CPU-intensive work.
|
||||
"""
|
||||
|
||||
# tag::PRIMES_PROC_TOP[]
|
||||
import sys
|
||||
from time import perf_counter
|
||||
from typing import NamedTuple
|
||||
from multiprocessing import Process, SimpleQueue, cpu_count # <1>
|
||||
from multiprocessing import queues # <2>
|
||||
|
||||
from primes import is_prime, NUMBERS
|
||||
|
||||
class PrimeResult(NamedTuple): # <3>
|
||||
n: int
|
||||
prime: bool
|
||||
elapsed: float
|
||||
|
||||
JobQueue = queues.SimpleQueue[int] # <4>
|
||||
ResultQueue = queues.SimpleQueue[PrimeResult] # <5>
|
||||
|
||||
def check(n: int) -> PrimeResult: # <6>
|
||||
t0 = perf_counter()
|
||||
res = is_prime(n)
|
||||
return PrimeResult(n, res, perf_counter() - t0)
|
||||
|
||||
def worker(jobs: JobQueue, results: ResultQueue) -> None: # <7>
|
||||
while n := jobs.get(): # <8>
|
||||
results.put(check(n)) # <9>
|
||||
results.put(PrimeResult(0, False, 0.0))
|
||||
# end::PRIMES_PROC_TOP[]
|
||||
|
||||
def start_jobs(workers: int) -> ResultQueue:
|
||||
jobs: JobQueue = SimpleQueue() # <2>
|
||||
results: ResultQueue = SimpleQueue()
|
||||
|
||||
for n in NUMBERS: # <3>
|
||||
jobs.put(n)
|
||||
|
||||
for _ in range(workers):
|
||||
proc = Process(target=worker, args=(jobs, results)) # <4>
|
||||
proc.start() # <5>
|
||||
jobs.put(0) # <6>
|
||||
|
||||
return results
|
||||
|
||||
def report(workers: int, results: ResultQueue) -> int:
|
||||
workers_done = 0
|
||||
checked = 0
|
||||
while workers_done < workers:
|
||||
n, prime, elapsed = results.get() # <7>
|
||||
if n == 0:
|
||||
workers_done += 1
|
||||
else:
|
||||
checked += 1
|
||||
label = 'P' if prime else ' '
|
||||
print(f'{n:16} {label} {elapsed:9.6f}s') # <8>
|
||||
return checked
|
||||
|
||||
|
||||
# tag::PRIMES_PROC_MAIN[]
|
||||
def main() -> None:
|
||||
if len(sys.argv) < 2: # <1>
|
||||
workers = cpu_count()
|
||||
else:
|
||||
workers = int(sys.argv[1])
|
||||
|
||||
print(f'Checking {len(NUMBERS)} numbers with {workers} processes:')
|
||||
t0 = perf_counter()
|
||||
results = start_jobs(workers)
|
||||
checked = report(workers, results)
|
||||
elapsed = perf_counter() - t0
|
||||
print(f'{checked} checks in {elapsed:.2f}s')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
# end::PRIMES_PROC_MAIN[]
|
3
20-executors/getflags/.gitignore
vendored
3
20-executors/getflags/.gitignore
vendored
@ -1 +1,2 @@
|
||||
flags/
|
||||
flags/
|
||||
downloaded/
|
||||
|
@ -20,7 +20,7 @@ Sample run::
|
||||
|
||||
# tag::FLAGS2_THREADPOOL[]
|
||||
from collections import Counter
|
||||
from concurrent import futures
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
import httpx
|
||||
import tqdm # type: ignore
|
||||
@ -37,13 +37,13 @@ def download_many(cc_list: list[str],
|
||||
verbose: bool,
|
||||
concur_req: int) -> Counter[DownloadStatus]:
|
||||
counter: Counter[DownloadStatus] = Counter()
|
||||
with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: # <4>
|
||||
with ThreadPoolExecutor(max_workers=concur_req) as executor: # <4>
|
||||
to_do_map = {} # <5>
|
||||
for cc in sorted(cc_list): # <6>
|
||||
future = executor.submit(download_one, cc,
|
||||
base_url, verbose) # <7>
|
||||
to_do_map[future] = cc # <8>
|
||||
done_iter = futures.as_completed(to_do_map) # <9>
|
||||
done_iter = as_completed(to_do_map) # <9>
|
||||
if not verbose:
|
||||
done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # <10>
|
||||
for future in done_iter: # <11>
|
||||
@ -52,7 +52,7 @@ def download_many(cc_list: list[str],
|
||||
except httpx.HTTPStatusError as exc: # <13>
|
||||
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
|
||||
error_msg = error_msg.format(resp=exc.response)
|
||||
except httpx.RequestError as exc: # <15>
|
||||
except httpx.RequestError as exc:
|
||||
error_msg = f'{exc} {type(exc)}'.strip()
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
@ -63,7 +63,7 @@ def download_many(cc_list: list[str],
|
||||
status = DownloadStatus.ERROR
|
||||
counter[status] += 1
|
||||
if verbose and error_msg:
|
||||
cc = to_do_map[future] # <16>
|
||||
cc = to_do_map[future] # <14>
|
||||
print(f'{cc} error: {error_msg}')
|
||||
|
||||
return counter
|
||||
|
@ -44,7 +44,7 @@ Part / Chapter #|Title|Directory|1<sup>st</sup> ed. Chapter #
|
||||
17|Iterators, Generators, and Classic Coroutines|[17-it-generator](17-it-generator)|14
|
||||
18|Context Managers and else Blocks|[18-with-match](18-with-match)|15
|
||||
19|Concurrency Models in Python|[19-concurrency](19-concurrency)|🆕
|
||||
20|Concurrency with Futures|[20-futures](20-futures)|17
|
||||
20|Concurrent Executors|[20-executors](20-executors)|17
|
||||
21|Asynchronous Programming|[21-async](21-async)|18
|
||||
**VI – Metaprogramming**|
|
||||
22|Dynamic Attributes and Properties|[22-dyn-attr-prop](22-dyn-attr-prop)|19
|
||||
|
Loading…
Reference in New Issue
Block a user