update from Atlas

This commit is contained in:
Luciano Ramalho
2015-04-11 01:05:23 -03:00
parent f4cdee2447
commit 7030b56878
61 changed files with 316 additions and 13 deletions

View File

@@ -1,87 +0,0 @@
"""
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(10)
10
>>> adder.send(20)
30
>>> adder.send(30)
60
>>> try:
... next(adder)
... except StopIteration as exc:
... result = exc.value
...
>>> result
Result(sum=60, terms=3, average=20.0)
Closing a coroutine:
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(1)
1
>>> adder.send(10)
11
>>> adder.close()
>>> try:
... next(adder)
... except StopIteration as exc:
... exc.value is None
...
True
"""
import sys
import collections
Result = collections.namedtuple('Result', 'sum terms average')
def adder_coro(initial=0):
total = initial
count = 0
while True:
try:
term = yield total
except GeneratorExit:
break
if term is None:
break
total += term
count += 1
return Result(total, count, total/count)
def prompt():
while True:
try:
term = float(input('+ '))
except ValueError:
break
yield term
def main(get_terms):
adder = adder_coro()
next(adder)
for term in get_terms:
adder.send(term)
try:
next(adder)
except StopIteration as exc:
result = exc.value
print(result)
if __name__ == '__main__':
if len(sys.argv) > 1:
get_terms = (float(n) for n in sys.argv[1:])
else:
get_terms = prompt()
main(get_terms)

View File

@@ -1,43 +0,0 @@
"""
Closing a generator raises ``GeneratorExit`` at the pending ``yield``
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(10)
10
>>> adder.send(20)
30
>>> adder.send(30)
60
>>> adder.close()
-> total: 60 terms: 3 average: 20.0
Other exceptions propagate to the caller:
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(10)
10
>>> adder.send('spam')
Traceback (most recent call last):
...
TypeError: unsupported operand type(s) for +=: 'int' and 'str'
"""
def adder_coro(initial=0):
total = initial
count = 0
try:
while True:
term = yield total
total += term
count += 1
except GeneratorExit:
average = total / count
msg = '-> total: {} terms: {} average: {}'
print(msg.format(total, count, average))

View File

@@ -1,96 +0,0 @@
"""
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(10)
10
>>> adder.send(20)
30
>>> adder.send(30)
60
>>> try:
... next(adder)
... except StopIteration as exc:
... result = exc.value
...
>>> result
Result(sum=60, terms=3, average=20.0)
Closing a coroutine:
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(1)
1
>>> adder.send(10)
11
>>> adder.close()
>>> try:
... next(adder)
... except StopIteration as exc:
... exc.value is None
...
True
"""
import sys
import collections
def coroutine(func):
def primed_coroutine(*args, **kwargs):
coro = func(*args, **kwargs)
next(coro)
return coro
return primed_coroutine
Result = collections.namedtuple('Result', 'sum terms average')
@coroutine
def adder_coro(initial=0):
total = initial
num_terms = 0
while True:
try:
term = yield total
except GeneratorExit:
break
if term is None:
break
total += term
num_terms += 1
return Result(total, num_terms, total/num_terms)
def prompt():
while True:
try:
term = float(input('+ '))
except ValueError:
break
yield term
def main(get_terms):
adder = adder_coro()
for term in get_terms:
adder.send(term)
try:
adder.send(None)
except StopIteration as exc:
result = exc.value
print(result)
if __name__ == '__main__':
if len(sys.argv) > 1:
get_terms = (float(n) for n in sys.argv[1:])
else:
get_terms = prompt()
main(get_terms)

View File

@@ -1,39 +0,0 @@
if 'raw_input' in dir(__builtins__):
input = raw_input # para funcionar com Python 2
def ler_num():
num = input('+: ')
try:
num = float(num)
except ValueError:
return 0
return num
def somadora():
qt_parcelas = 0
total = 0
try:
while True:
parcela = yield
qt_parcelas += 1
total += parcela
print('parcelas: %d total: %d' % (qt_parcelas, total))
finally:
print('parcelas: %d total: %d media: %d' % (qt_parcelas, total, total/qt_parcelas))
def main():
coro = somadora()
next(coro)
while True:
item = ler_num()
if item:
coro.send(item)
else:
print('Fechando corotina...')
coro.close()
break
if __name__=='__main__':
main()

View File

@@ -1,47 +0,0 @@
if 'raw_input' in dir(__builtins__):
input = raw_input # para funcionar com Python 2
def ler_parcela():
parcela = input('+: ')
try:
parcela = float(parcela)
except ValueError:
return 0
return parcela
# decorator
def coro(func):
def start(*args, **kwargs):
g = func(*args, **kwargs)
next(g)
return g
return start
@coro
def somadora():
qt_parcelas = 0
total = 0
try:
while True:
parcela = yield
qt_parcelas += 1
total += parcela
print('parcelas: %d total: %d' % (qt_parcelas, total))
finally:
print('parcelas: %d total: %d media: %d' % (qt_parcelas, total, total/qt_parcelas))
def main():
coro = somadora()
while True:
parcela = ler_parcela()
if parcela:
coro.send(parcela)
else:
print('Fechando corotina...')
coro.close()
break
if __name__=='__main__':
main()

View File

@@ -1,47 +0,0 @@
import sys
import collections
Result = collections.namedtuple('Result', 'total average')
def adder():
total = 0
count = 0
while True:
term = yield
try:
term = float(term)
except (ValueError, TypeError):
break
else:
total += term
count += 1
return Result(total, total/count)
def process_args(coro, args):
for arg in args:
coro.send(arg)
try:
next(coro)
except StopIteration as exc:
return exc.value
def prompt(coro):
while True:
term = input('+> ')
try:
coro.send(term)
except StopIteration as exc:
return exc.value
def main():
coro = adder()
next(coro) # prime it
if len(sys.argv) > 1:
res = process_args(coro, sys.argv[1:])
else:
res = prompt(coro)
print(res)
main()

View File

@@ -1,42 +0,0 @@
import sys
def ask():
prompt = '>'
while True:
response = input(prompt)
if not response:
return 0
yield response
def parse_args():
yield from iter(sys.argv[1:])
def fetch(producer):
gen = producer()
next(gen)
yield from gen
def main(args):
if args:
producer = parse_args
else:
producer = ask
total = 0
count = 0
gen = fetch(producer())
while True:
term = yield from gen
term = float(term)
total += term
count += 1
average = total / count
print('total: {} average: {}'.format(total, average))
if __name__ == '__main__':
main(sys.argv[1:])

View File

@@ -1,13 +0,0 @@
>>> def coroutine():
... print('coroutine started')
... x = yield
... print('coroutine received: {!r}'.format(x))
...
>>> coro = coroutine()
>>> next(coro)
coroutine started
>>> coro.send(42)
coroutine received: 42
Traceback (most recent call last):
...
StopIteration

View File

@@ -1,66 +0,0 @@
"""
Coroutine closing demonstration::
# BEGIN DEMO_CORO_EXC_1
>>> exc_coro = demo_exc_handling()
>>> next(exc_coro)
-> coroutine started
>>> exc_coro.send(11)
-> coroutine received: 11
>>> exc_coro.send(22)
-> coroutine received: 22
>>> exc_coro.close()
>>> from inspect import getgeneratorstate
>>> getgeneratorstate(exc_coro)
'GEN_CLOSED'
# END DEMO_CORO_EXC_1
Coroutine handling exception::
# BEGIN DEMO_CORO_EXC_2
>>> exc_coro = demo_exc_handling()
>>> next(exc_coro)
-> coroutine started
>>> exc_coro.send(11)
-> coroutine received: 11
>>> exc_coro.throw(DemoException)
*** DemoException handled. Continuing...
>>> getgeneratorstate(exc_coro)
'GEN_SUSPENDED'
# END DEMO_CORO_EXC_2
Coroutine not handling exception::
# BEGIN DEMO_CORO_EXC_3
>>> exc_coro = demo_exc_handling()
>>> next(exc_coro)
-> coroutine started
>>> exc_coro.send(11)
-> coroutine received: 11
>>> exc_coro.throw(ZeroDivisionError)
Traceback (most recent call last):
...
ZeroDivisionError
>>> getgeneratorstate(exc_coro)
'GEN_CLOSED'
# END DEMO_CORO_EXC_3
"""
# BEGIN EX_CORO_EXC
class DemoException(Exception):
"""An exception type for the demonstration."""
def demo_exc_handling():
print('-> coroutine started')
while True:
try:
x = yield
except DemoException: # <1>
print('*** DemoException handled. Continuing...')
else: # <2>
print('-> coroutine received: {!r}'.format(x))
raise RuntimeError('This line should never run.') # <3>
# END EX_CORO_EXC

View File

@@ -1,61 +0,0 @@
"""
Second coroutine closing demonstration::
>>> fin_coro = demo_finally()
>>> next(fin_coro)
-> coroutine started
>>> fin_coro.send(11)
-> coroutine received: 11
>>> fin_coro.send(22)
-> coroutine received: 22
>>> fin_coro.close()
-> coroutine ending
Second coroutine not handling exception::
>>> fin_coro = demo_finally()
>>> next(fin_coro)
-> coroutine started
>>> fin_coro.send(11)
-> coroutine received: 11
>>> fin_coro.throw(ZeroDivisionError) # doctest: +SKIP
-> coroutine ending
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "coro_exception_demos.py", line 109, in demo_finally
print('-> coroutine received: {!r}'.format(x))
ZeroDivisionError
The last test above must be skipped because the output '-> coroutine ending'
is not detected by doctest, which raises a false error. However, if you
run this file as shown below, you'll see that output "leak" into standard
output::
$ python3 -m doctest coro_exception_demos.py
-> coroutine ending
"""
# BEGIN EX_CORO_FINALLY
class DemoException(Exception):
"""An exception type for the demonstration."""
def demo_finally():
print('-> coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing...')
else:
print('-> coroutine received: {!r}'.format(x))
finally:
print('-> coroutine ending')
# END EX_CORO_FINALLY

View File

@@ -1,13 +0,0 @@
>>> def coroutine():
... print('coroutine started')
... x = yield
... print('coroutine received: {!r}'.format(x))
...
>>> coro = coroutine()
>>> next(coro)
coroutine started
>>> coro.send(42)
coroutine received: 42
Traceback (most recent call last):
...
StopIteration

View File

@@ -1,45 +0,0 @@
"""
Closing a generator raises ``GeneratorExit`` at the pending ``yield``
>>> coro_avg = averager()
>>> next(coro_avg)
0.0
>>> coro_avg.send(10)
10.0
>>> coro_avg.send(20)
15.0
>>> coro_avg.send(30)
20.0
>>> coro_avg.close()
-> total: 60.0 average: 20.0 terms: 3
Other exceptions propagate to the caller:
>>> coro_avg = averager()
>>> next(coro_avg)
0.0
>>> coro_avg.send(10)
10.0
>>> coro_avg.send('spam')
Traceback (most recent call last):
...
TypeError: unsupported operand type(s) for +=: 'float' and 'str'
"""
# BEGIN CORO_AVERAGER
def averager():
total = average = 0.0
count = 0
try:
while True:
term = yield average
total += term
count += 1
average = total/count
except GeneratorExit:
msg = '-> total: {} average: {} terms: {}'
print(msg.format(total, average, count))
# END CORO_AVERAGER

View File

@@ -1,25 +0,0 @@
# BEGIN CORO_AVERAGER
"""
A coroutine to compute a running average
>>> coro_avg = averager() # <1>
>>> next(coro_avg) # <2>
>>> coro_avg.send(10) # <3>
10.0
>>> coro_avg.send(30)
20.0
>>> coro_avg.send(5)
15.0
"""
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average # <4>
total += term
count += 1
average = total/count
# END CORO_AVERAGER

View File

@@ -1,30 +0,0 @@
# BEGIN DECORATED_AVERAGER
"""
A coroutine to compute a running average
>>> coro_avg = averager() # <1>
>>> from inspect import getgeneratorstate
>>> getgeneratorstate(coro_avg) # <2>
'GEN_SUSPENDED'
>>> coro_avg.send(10) # <3>
10.0
>>> coro_avg.send(30)
20.0
>>> coro_avg.send(5)
15.0
"""
from coroutil import coroutine # <4>
@coroutine # <5>
def averager(): # <6>
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
# END DECORATED_AVERAGER

View File

@@ -1,61 +0,0 @@
"""
A coroutine to compute a running average.
Testing ``averager`` by itself::
# BEGIN RETURNING_AVERAGER_DEMO1
>>> coro_avg = averager()
>>> next(coro_avg) # <1>
>>> coro_avg.send(10)
>>> coro_avg.send(30)
>>> coro_avg.send(6.5)
>>> coro_avg.send(None) # <2>
Traceback (most recent call last):
...
StopIteration: Result(count=3, average=15.5)
# END RETURNING_AVERAGER_DEMO1
Catching `StopIteration` to extract the value returned by
the coroutine::
# BEGIN RETURNING_AVERAGER_DEMO2
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(30)
>>> coro_avg.send(6.5)
>>> try:
... coro_avg.send(None)
... except StopIteration as exc:
... result = exc.value
...
>>> result
Result(count=3, average=15.5)
# END RETURNING_AVERAGER_DEMO2
"""
# BEGIN RETURNING_AVERAGER
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break # <1>
total += term
count += 1
average = total/count
return Result(count, average) # <2>
# END RETURNING_AVERAGER

View File

@@ -1,107 +0,0 @@
"""
A coroutine to compute a running average.
Testing ``averager`` by itself::
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(30)
>>> coro_avg.send(6.5)
>>> coro_avg.send(None)
Traceback (most recent call last):
...
StopIteration: Result(count=3, average=15.5)
Driving it with ``yield from``::
>>> def summarize(results):
... while True:
... result = yield from averager()
... results.append(result)
...
>>> results = []
>>> summary = summarize(results)
>>> next(summary)
>>> for height in data['girls;m']:
... summary.send(height)
...
>>> summary.send(None)
>>> for height in data['boys;m']:
... summary.send(height)
...
>>> summary.send(None)
>>> results == [
... Result(count=10, average=1.4279999999999997),
... Result(count=9, average=1.3888888888888888)
... ]
True
"""
# BEGIN YIELD_FROM_AVERAGER
from collections import namedtuple
Result = namedtuple('Result', 'count average')
# the subgenerator
def averager(): # <1>
total = 0.0
count = 0
average = None
while True:
term = yield # <2>
if term is None: # <3>
break
total += term
count += 1
average = total/count
return Result(count, average) # <4>
# the delegating generator
def grouper(results, key): # <5>
while True: # <6>
results[key] = yield from averager() # <7>
# the client code, a.k.a. the caller
def main(data): # <8>
results = {}
for key, values in data.items():
group = grouper(results, key) # <9>
next(group) # <10>
for value in values:
group.send(value) # <11>
group.send(None) # <12>
# print(results) # uncomment to debug
report(results)
# output report
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
# END YIELD_FROM_AVERAGER

View File

@@ -1,12 +0,0 @@
# BEGIN CORO_DECO
from functools import wraps
def coroutine(func):
"""Decorator: primes `func` by advancing to first `yield`"""
@wraps(func)
def primer(*args,**kwargs): # <1>
gen = func(*args,**kwargs) # <2>
next(gen) # <3>
return gen # <4>
return primer
# END CORO_DECO

View File

@@ -1,69 +0,0 @@
from time import sleep
def countdown(n):
while n:
print('\tn ->', n)
yield n
n -= 1
sleep(1)
def foo():
for i in range(6, 3, -1):
yield i
yield from countdown(3)
#for j in foo():
# print('j ->', j)
def squares(n):
yield from [i for i in range(n)]
yield from [i*i for i in range(n)]
def squares_stupid(n):
for i in range(n):
yield i
for i in range(n):
yield i*i
#for s in squares(10):
# print(s)
def tokenize():
while True:
source = input('> ')
try:
obj = eval(source)
except BaseException:
print('*crash*')
return
try:
it = iter(obj)
except TypeError:
yield obj
return
else:
yield from it
#g = tokenize()
#for res in g:
# print(res)
from concurrent.futures import Future
def f():
f = future()
def foo(fut):
print(fut, fut.result())
f = Future()
f.add_done_callback(foo)
f.set_result(42)

View File

@@ -1,24 +0,0 @@
>>> def coro():
... print 'iniciando corotina...'
... while True:
... x = yield
... print 'recebido: ', x
... if x == -1: break
... print 'terminando corotina.'
...
>>> c = coro()
>>> next(c)
iniciando corotina...
>>> c.send(7)
recebido: 7
>>> c.send(3)
recebido: 3
>>> c.send(10)
recebido: 10
>>> c.send(-1)
recebido: -1
terminando corotina.
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>>

View File

@@ -1,25 +0,0 @@
def corrotina():
print('\t(corrotina) inciciando...')
x = yield
print('\t(corrotina) recebeu x: %r' % x)
y = yield
print('\t(corrotina) recebeu y: %r' % y)
print('\t(corrotina) terminando.')
def principal():
print('(principal) iniciando...')
co = corrotina()
print('(principal) invocando next(co)...')
next(co)
print('(principal) invocando co.send(88)...')
co.send(88)
try:
print('(principal) invocando co.send(99)...')
co.send(99)
# o print a seguir nunca vai acontecer
print('(principal) invocado co.send(99)')
except StopIteration:
print('(principal) a corotina nao tem mais valores a produzir')
principal()

View File

@@ -1,27 +0,0 @@
def corrotina():
print('\t(corrotina) inciciando...')
x = yield 1
print('\t(corrotina) recebeu x: %r' % x)
y = yield 2
print('\t(corrotina) recebeu y: %r' % y)
print('\t(corrotina) terminando.')
def principal():
print('(principal) iniciando...')
co = corrotina()
print('(principal) invocando next(co)...')
res = next(co)
print('(principal) produzido por next(co): %r' % res)
print('(principal) invocando co.send(88)...')
res2 = co.send(88)
print('(principal) produzido por co.send(88): %r' % res2)
try:
print('(principal) invocando co.send(99)...')
res3 = co.send(99)
# o print a seguir nunca vai acontecer
print('(principal) produzido por co.send(99): %r' % res3)
except StopIteration:
print('(principal) a corotina nao tem mais valores a produzir')
principal()

View File

@@ -1,21 +0,0 @@
"""
>>> items = [1, 2, [3, 4, [5, 6], 7], 8]
>>> flatten(items)
<generator object flatten at 0x73bd9c>
>>> list(flatten(items))
[1, 2, 3, 4, 5, 6, 7, 8]
>>> mixed_bag = [1, 'spam', 2, [3, 'eggs', 4], {'x': 1, 'y': 2}]
>>> list(flatten(mixed_bag))
[1, 'spam', 2, 3, 'eggs', 4, 'y', 'x']
"""
from collections import Iterable
def flatten(items):
for x in items:
if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
yield from flatten(x)
else:
yield x

View File

@@ -1,26 +0,0 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal(ger1())
OK
42
Visualização no PythonTutor: http://goo.gl/FQWq2F
"""
def ger1():
val = yield 'OK'
print(val)
yield # para evitar o StopIteration
def principal(g):
print(next(g))
g.send(42)
# auto-teste
import doctest
doctest.testmod()

View File

@@ -1,29 +0,0 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal(ger2())
OK
42
Visualização no PythonTutor: http://goo.gl/pWrlkm
"""
def ger1():
val = yield 'OK'
print(val)
yield # para evitar o StopIteration
def ger2():
yield from ger1()
def principal(g):
print(next(g))
g.send(42)
# auto-teste
import doctest
doctest.testmod()

View File

@@ -1,30 +0,0 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal(ger2())
OK
None
Visualização no PythonTutor: http://goo.gl/61CUcA
"""
def ger1():
val = yield 'OK'
print(val)
yield # para evitar o StopIteration
def ger2():
for i in ger1():
yield i
def principal(g):
print(next(g))
g.send(42)
# auto-teste
import doctest
doctest.testmod()

View File

@@ -1,31 +0,0 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal_susto(ger1())
OK
Bu!
Visualização no PythonTutor: http://goo.gl/m6p2Bc
"""
def ger1():
try:
val = yield 'OK'
except RuntimeError as exc:
print(exc)
else:
print(val)
yield # para evitar o StopIteration
def principal_susto(g):
print(next(g))
g.throw(RuntimeError('Bu!'))
# auto-teste
import doctest
doctest.testmod()

View File

@@ -1,35 +0,0 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal_susto(ger2())
OK
Bu!
Visualização no PythonTutor: http://goo.gl/QXzQHS
"""
def ger1():
try:
val = yield 'OK'
except RuntimeError as exc:
print(exc)
else:
print(val)
yield # para evitar o StopIteration
def ger2():
yield from ger1()
def principal_susto(g):
print(next(g))
g.throw(RuntimeError('Bu!'))
# auto-teste
import doctest
doctest.testmod()

View File

@@ -1,20 +0,0 @@
# adaptado de:
# https://github.com/feihong/tulip-talk/blob/master/examples/2-tulip-download.py
import asyncio
import aiohttp
@asyncio.coroutine
def download(url):
response = yield from aiohttp.request('GET', url)
for k, v in response.items():
print('{}: {}'.format(k, v[:80]))
data = yield from response.read()
print('\nReceived {} bytes.\n'.format(len(data)))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
url = 'https://www.cia.gov/library/publications/the-world-factbook/geos/br.html'
coroutine = download(url)
loop.run_until_complete(coroutine)

View File

@@ -1,5 +0,0 @@
from keyword import kwlist
from itertools import combinations
for combo in combinations(kwlist, 2):
print(*combo)

View File

@@ -1,90 +0,0 @@
"""
A "mirroring" ``stdout`` context.
While active, the context manager reverses text output to
``stdout``::
# BEGIN MIRROR_DEMO_1
>>> from mirror import LookingGlass
>>> with LookingGlass() as what: # <1>
... print('Alice, Kitty and Snowdrop') # <2>
... print(what)
...
pordwonS dna yttiK ,ecilA # <3>
YKCOWREBBAJ
>>> what # <4>
'JABBERWOCKY'
# END MIRROR_DEMO_1
This exposes the context manager operation::
# BEGIN MIRROR_DEMO_2
>>> from mirror import LookingGlass
>>> manager = LookingGlass() # <1>
>>> manager
<mirror.LookingGlass object at 0x2a578ac>
>>> monster = manager.__enter__() # <2>
>>> monster == 'JABBERWOCKY' # <3>
eurT
>>> monster
'YKCOWREBBAJ'
>>> manager
>ca875a2x0 ta tcejbo ssalGgnikooL.rorrim<
>>> manager.__exit__(None, None, None) # <4>
>>> monster
'JABBERWOCKY'
# END MIRROR_DEMO_2
The context manager can handle and "swallow" exceptions.
# BEGIN MIRROR_DEMO_3
>>> from mirror import LookingGlass
>>> with LookingGlass():
... print('Humpty Dumpty')
... x = 1/0 # <1>
... print('END') # <2>
...
ytpmuD ytpmuH
Please DO NOT divide by zero!
>>> with LookingGlass():
... print('Humpty Dumpty')
... x = no_such_name # <1>
... print('END') # <2>
...
Traceback (most recent call last):
...
NameError: name 'no_such_name' is not defined
# END MIRROR_DEMO_3
"""
# BEGIN MIRROR_EX
class LookingGlass:
def __enter__(self): # <1>
import sys
self.original_write = sys.stdout.write # <2>
sys.stdout.write = self.reverse_write # <3>
return 'JABBERWOCKY' # <4>
def reverse_write(self, text): # <5>
self.original_write(text[::-1])
def __exit__(self, exc_type, exc_value, traceback): # <6>
import sys # <7>
sys.stdout.write = self.original_write # <8>
if exc_type is ZeroDivisionError: # <9>
print('Please DO NOT divide by zero!')
return True # <10>
# <11>
# END MIRROR_EX

View File

@@ -1,64 +0,0 @@
"""
A "mirroring" ``stdout`` context manager.
While active, the context manager reverses text output to
``stdout``::
# BEGIN MIRROR_GEN_DEMO_1
>>> from mirror_gen import looking_glass
>>> with looking_glass() as what: # <1>
... print('Alice, Kitty and Snowdrop')
... print(what)
...
pordwonS dna yttiK ,ecilA
YKCOWREBBAJ
>>> what
'JABBERWOCKY'
# END MIRROR_GEN_DEMO_1
This exposes the context manager operation::
# BEGIN MIRROR_GEN_DEMO_2
>>> from mirror_gen import looking_glass
>>> manager = looking_glass() # <1>
>>> manager # doctest: +ELLIPSIS
<contextlib._GeneratorContextManager object at 0x...>
>>> monster = manager.__enter__() # <2>
>>> monster == 'JABBERWOCKY' # <3>
eurT
>>> monster
'YKCOWREBBAJ'
>>> manager # doctest: +ELLIPSIS
>...x0 ta tcejbo reganaMtxetnoCrotareneG_.biltxetnoc<
>>> manager.__exit__(None, None, None) # <4>
>>> monster
'JABBERWOCKY'
# END MIRROR_GEN_DEMO_2
"""
# BEGIN MIRROR_GEN_EX
import contextlib
@contextlib.contextmanager # <1>
def looking_glass():
import sys
original_write = sys.stdout.write # <2>
def reverse_write(text): # <3>
original_write(text[::-1])
sys.stdout.write = reverse_write # <4>
yield 'JABBERWOCKY' # <5>
sys.stdout.write = original_write # <6>
# END MIRROR_GEN_EX

View File

@@ -1,101 +0,0 @@
"""
A "mirroring" ``stdout`` context manager.
While active, the context manager reverses text output to
``stdout``::
# BEGIN MIRROR_GEN_DEMO_1
>>> from mirror_gen import looking_glass
>>> with looking_glass() as what: # <1>
... print('Alice, Kitty and Snowdrop')
... print(what)
...
pordwonS dna yttiK ,ecilA
YKCOWREBBAJ
>>> what
'JABBERWOCKY'
# END MIRROR_GEN_DEMO_1
This exposes the context manager operation::
# BEGIN MIRROR_GEN_DEMO_2
>>> from mirror_gen import looking_glass
>>> manager = looking_glass() # <1>
>>> manager # doctest: +ELLIPSIS
<contextlib._GeneratorContextManager object at 0x...>
>>> monster = manager.__enter__() # <2>
>>> monster == 'JABBERWOCKY' # <3>
eurT
>>> monster
'YKCOWREBBAJ'
>>> manager # doctest: +ELLIPSIS
>...x0 ta tcejbo reganaMtxetnoCrotareneG_.biltxetnoc<
>>> manager.__exit__(None, None, None) # <4>
>>> monster
'JABBERWOCKY'
# END MIRROR_GEN_DEMO_2
The context manager can handle and "swallow" exceptions.
The following test does not pass under doctest (a
ZeroDivisionError is reported by doctest) but passes
if executed by hand in the Python 3 console (the exception
is handled by the context manager):
# BEGIN MIRROR_GEN_DEMO_3
>>> from mirror_gen import looking_glass
>>> with looking_glass():
... print('Humpty Dumpty')
... x = 1/0 # <1>
... print('END') # <2>
...
ytpmuD ytpmuH
Please DO NOT divide by zero!
# END MIRROR_GEN_DEMO_3
>>> with looking_glass():
... print('Humpty Dumpty')
... x = no_such_name # <1>
... print('END') # <2>
...
Traceback (most recent call last):
...
NameError: name 'no_such_name' is not defined
"""
# BEGIN MIRROR_GEN_EXC
import contextlib
@contextlib.contextmanager
def looking_glass():
import sys
original_write = sys.stdout.write
def reverse_write(text):
original_write(text[::-1])
sys.stdout.write = reverse_write
msg = '' # <1>
try:
yield 'JABBERWOCKY'
except ZeroDivisionError: # <2>
msg = 'Please DO NOT divide by zero!'
finally:
sys.stdout.write = original_write # <3>
if msg:
print(msg) # <4>
# END MIRROR_GEN_EXC

View File

@@ -1,12 +0,0 @@
What's New in Python 2.5 - PEP 342: New Generator Features
http://docs.python.org/release/2.5/whatsnew/pep-342.html
PEP 342 -- Coroutines via Enhanced Generators
http://www.python.org/dev/peps/pep-0342/
PEP 380 -- Syntax for Delegating to a Subgenerator
http://www.python.org/dev/peps/pep-0380/
Coroutines For the Working Python Developer
http://sdiehl.github.io/coroutine-tutorial/

View File

@@ -1,257 +0,0 @@
"""
Taxi simulator
Sample run with two cars, random seed 10. This is a valid doctest.
>>> main(num_taxis=2, seed=10)
taxi: 0 Event(time=0, proc=0, action='leave garage')
taxi: 0 Event(time=4, proc=0, action='pick up passenger')
taxi: 1 Event(time=5, proc=1, action='leave garage')
taxi: 1 Event(time=9, proc=1, action='pick up passenger')
taxi: 0 Event(time=10, proc=0, action='drop off passenger')
taxi: 1 Event(time=12, proc=1, action='drop off passenger')
taxi: 0 Event(time=17, proc=0, action='pick up passenger')
taxi: 1 Event(time=19, proc=1, action='pick up passenger')
taxi: 1 Event(time=21, proc=1, action='drop off passenger')
taxi: 1 Event(time=24, proc=1, action='pick up passenger')
taxi: 0 Event(time=28, proc=0, action='drop off passenger')
taxi: 1 Event(time=28, proc=1, action='drop off passenger')
taxi: 0 Event(time=29, proc=0, action='going home')
taxi: 1 Event(time=30, proc=1, action='pick up passenger')
taxi: 1 Event(time=61, proc=1, action='drop off passenger')
taxi: 1 Event(time=62, proc=1, action='going home')
*** end of events ***
See explanation and longer sample run at the end of this module.
"""
import sys
import random
import collections
import queue
import argparse
DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 80
SEARCH_DURATION = 4
TRIP_DURATION = 10
DEPARTURE_INTERVAL = 5
Event = collections.namedtuple('Event', 'time proc action')
def compute_delay(interval):
"""Compute action delay using exponential distribution"""
return int(random.expovariate(1/interval)) + 1
# BEGIN TAXI_PROCESS
def taxi_process(ident, trips, start_time=0): # <1>
"""Yield to simulator issuing event at each state change"""
time = yield Event(start_time, ident, 'leave garage') # <2>
for i in range(trips): # <3>
prowling_ends = time + compute_delay(SEARCH_DURATION) # <4>
time = yield Event(prowling_ends, ident, 'pick up passenger') # <5>
trip_ends = time + compute_delay(TRIP_DURATION) # <6>
time = yield Event(trip_ends, ident, 'drop off passenger') # <7>
yield Event(time + 1, ident, 'going home') # <8>
# end of taxi process # <9>
# END TAXI_PROCESS
# BEGIN TAXI_SIMULATOR
class Simulator:
def __init__(self, procs_map):
self.events = queue.PriorityQueue()
self.procs = dict(procs_map)
def run(self, end_time): # <1>
"""Schedule and display events until time is up"""
# schedule the first event for each cab
for _, proc in sorted(self.procs.items()): # <2>
first_event = next(proc) # <3>
self.events.put(first_event) # <4>
# main loop of the simulation
time = 0
while time < end_time: # <5>
if self.events.empty(): # <6>
print('*** end of events ***')
break
# get and display current event
current_event = self.events.get() # <7>
print('taxi:', current_event.proc, # <8>
current_event.proc * ' ', current_event)
# schedule next action for current proc
time = current_event.time # <9>
proc = self.procs[current_event.proc] # <10>
try:
next_event = proc.send(time) # <11>
except StopIteration:
del self.procs[current_event.proc] # <12>
else:
self.events.put(next_event) # <13>
else: # <14>
msg = '*** end of simulation time: {} events pending ***'
print(msg.format(self.events.qsize()))
# END TAXI_SIMULATOR
def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS,
seed=None):
"""Initialize random generator, build procs and run simulation"""
if seed is not None:
random.seed(seed) # get reproducible results
taxis = {i: taxi_process(i, (i+1)*2, i*DEPARTURE_INTERVAL)
for i in range(num_taxis)}
sim = Simulator(taxis)
sim.run(end_time)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Taxi fleet simulator.')
parser.add_argument('-e', '--end-time', type=int,
default=DEFAULT_END_TIME,
help='simulation end time; default = %s'
% DEFAULT_END_TIME)
parser.add_argument('-t', '--taxis', type=int,
default=DEFAULT_NUMBER_OF_TAXIS,
help='number of taxis running; default = %s'
% DEFAULT_NUMBER_OF_TAXIS)
parser.add_argument('-s', '--seed', type=int, default=None,
help='random generator seed (for testing)')
args = parser.parse_args()
main(args.end_time, args.taxis, args.seed)
"""
Notes for the ``taxi_process`` coroutine::
<1> `taxi_process` will be called once per taxi, creating a generator
object to represent its operations. `ident` is the number of the taxi
(eg. 0, 1, 2 in the sample run); `trips` is the number of trips this
taxi will make before going home; `start_time` is when the taxi
leaves the garage.
<2> The first `Event` yielded is `'leave garage'`. This suspends the
coroutine, and lets the simulation main loop proceed to the next
scheduled event. When it's time to reactivate this process, the main
loop will `send` the current simulation time, which is assigned to
`time`.
<3> This block will be repeated once for each trip.
<4> The ending time of the search for a passenger is computed.
<5> An `Event` signaling passenger pick up is yielded. The coroutine
pauses here. When the time comes to reactivate this coroutine,
the main loop will again `send` the current time.
<6> The ending time of the trip is computed, taking into account the
current `time`.
<7> An `Event` signaling passenger drop off is yielded. Coroutine
suspended again, waiting for the main loop to send the time of when
it's time to continue.
<8> The `for` loop ends after the given number of trips, and a final
`'going home'` event is yielded, to happen 1 minute after the current
time. The coroutine will suspend for the last time. When reactivated,
it will be sent the time from the simulation main loop, but here I
don't assign it to any variable because it will not be useful.
<9> When the coroutine falls off the end, the coroutine object raises
`StopIteration`.
Notes for the ``Simulator.run`` method::
<1> The simulation `end_time` is the only required argument for `run`.
<2> Use `sorted` to retrieve the `self.procs` items ordered by the
integer key; we don't care about the key, so assign it to `_`.
<3> `next(proc)` primes each coroutine by advancing it to the first
yield, so it's ready to be sent data. An `Event` is yielded.
<4> Add each event to the `self.events` `PriorityQueue`. The first
event for each taxi is `'leave garage'`, as seen in the sample run
(ex_taxi_process>>).
<5> Main loop of the simulation: run until the current `time` equals
or exceeds the `end_time`.
<6> The main loop may also exit if there are no pending events in the
queue.
<7> Get `Event` with the smallest `time` in the queue; this is the
`current_event`.
<8> Display the `Event`, identifying the taxi and adding indentation
according to the taxi id.
<9> Update the simulation time with the time of the `current_event`.
<10> Retrieve the coroutine for this taxi from the `self.procs`
dictionary.
<11> Send the `time` to the coroutine. The coroutine will yield the
`next_event` or raise `StopIteration` it's finished.
<12> If `StopIteration` was raised, delete the coroutine from the
`self.procs` dictionary.
<13> Otherwise, put the `next_event` in the queue.
<14> If the loop exits because the simulation time passed, display the
number of events pending (which may be zero by coincidence,
sometimes).
Sample run from the command line, seed=24, total elapsed time=160::
# BEGIN TAXI_SAMPLE_RUN
$ python3 taxi_sim.py -s 24 -e 160
taxi: 0 Event(time=0, proc=0, action='leave garage')
taxi: 0 Event(time=5, proc=0, action='pick up passenger')
taxi: 1 Event(time=5, proc=1, action='leave garage')
taxi: 1 Event(time=6, proc=1, action='pick up passenger')
taxi: 2 Event(time=10, proc=2, action='leave garage')
taxi: 2 Event(time=11, proc=2, action='pick up passenger')
taxi: 2 Event(time=23, proc=2, action='drop off passenger')
taxi: 0 Event(time=24, proc=0, action='drop off passenger')
taxi: 2 Event(time=24, proc=2, action='pick up passenger')
taxi: 2 Event(time=26, proc=2, action='drop off passenger')
taxi: 0 Event(time=30, proc=0, action='pick up passenger')
taxi: 2 Event(time=31, proc=2, action='pick up passenger')
taxi: 0 Event(time=43, proc=0, action='drop off passenger')
taxi: 0 Event(time=44, proc=0, action='going home')
taxi: 2 Event(time=46, proc=2, action='drop off passenger')
taxi: 2 Event(time=49, proc=2, action='pick up passenger')
taxi: 1 Event(time=70, proc=1, action='drop off passenger')
taxi: 2 Event(time=70, proc=2, action='drop off passenger')
taxi: 2 Event(time=71, proc=2, action='pick up passenger')
taxi: 2 Event(time=79, proc=2, action='drop off passenger')
taxi: 1 Event(time=88, proc=1, action='pick up passenger')
taxi: 2 Event(time=92, proc=2, action='pick up passenger')
taxi: 2 Event(time=98, proc=2, action='drop off passenger')
taxi: 2 Event(time=99, proc=2, action='going home')
taxi: 1 Event(time=102, proc=1, action='drop off passenger')
taxi: 1 Event(time=104, proc=1, action='pick up passenger')
taxi: 1 Event(time=135, proc=1, action='drop off passenger')
taxi: 1 Event(time=136, proc=1, action='pick up passenger')
taxi: 1 Event(time=151, proc=1, action='drop off passenger')
taxi: 1 Event(time=152, proc=1, action='going home')
*** end of events ***
# END TAXI_SAMPLE_RUN
"""

View File

@@ -1,4 +1,3 @@
"""
Taxi simulator
@@ -41,6 +40,31 @@ DEPARTURE_INTERVAL = 5
Event = collections.namedtuple('Event', 'time proc action')
Actitivy = collections.namedtuple('Actitivy', 'name distr_param')
START = Actitivy('start shift', TRIP_DURATION)
SEARCH_PAX = Actitivy('searching for passenger', SEARCH_DURATION)
DRIVE_PAX = Actitivy('driving passenger', TRIP_DURATION)
TRANSITIONS = {
START : SEARCH_PAX;
SEARCH_PAX : DRIVE_PAX;
}
def compute_duration(previous_action):
"""Compute action duration using exponential distribution"""
if previous_action in ['leave garage', 'drop off passenger']:
# state is prowling
interval = SEARCH_DURATION
elif previous_action == 'pick up passenger':
# state is trip
interval = TRIP_DURATION
elif previous_action == 'going home':
interval = 1
else:
assert False
return int(random.expovariate(1/interval)) + 1
# BEGIN TAXI_PROCESS
def taxi_process(ident, trips, start_time=0): # <1>
@@ -98,19 +122,6 @@ class Simulator:
# END TAXI_SIMULATOR
def compute_duration(previous_action):
"""Compute action duration using exponential distribution"""
if previous_action in ['leave garage', 'drop off passenger']:
# state is prowling
interval = SEARCH_DURATION
elif previous_action == 'pick up passenger':
# state is trip
interval = TRIP_DURATION
elif previous_action == 'going home':
interval = 1
else:
assert False
return int(random.expovariate(1/interval)) + 1
def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS,

View File

@@ -1,52 +0,0 @@
# Code below is the expansion of the statement:
#
# RESULT = yield from EXPR
#
# Copied verbatim from the Formal Semantics section of
# PEP 380 -- Syntax for Delegating to a Subgenerator
#
# https://www.python.org/dev/peps/pep-0380/#formal-semantics
# BEGIN YIELD_FROM_EXPANSION
_i = iter(EXPR) # <1>
try:
_y = next(_i) # <2>
except StopIteration as _e:
_r = _e.value # <3>
else:
while 1: # <4>
try:
_s = yield _y # <5>
except GeneratorExit as _e: # <6>
try:
_m = _i.close
except AttributeError:
pass
else:
_m()
raise _e
except BaseException as _e: # <7>
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else: # <8>
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else: # <9>
try: # <10>
if _s is None: # <11>
_y = next(_i)
else:
_y = _i.send(_s)
except StopIteration as _e: # <12>
_r = _e.value
break
RESULT = _r # <13>
# END YIELD_FROM_EXPANSION