wikipedia pictures download example

This commit is contained in:
Luciano Ramalho 2015-02-02 02:56:14 -02:00
parent 73d98de6cd
commit ab6ce5b6a4
37 changed files with 2042 additions and 37 deletions

View File

@ -0,0 +1,145 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Charserver</title>
<script type="text/javascript">
//(function() {
var BASE_URL = 'http://127.0.0.1:8888/chars';
var RESULTS_PER_REQUEST = 10;
var REQUEST_DELAY = 1000; // in milliseconds
var httpRequest = new XMLHttpRequest();
httpRequest.onreadystatechange = processResponse;
function requestMaker(start) {
var makeRequest = function (event) {
var query = document.getElementById('queryField').value;
var limit = RESULTS_PER_REQUEST;
httpRequest.open('GET', BASE_URL+'?query='+query+'&limit='+limit);
httpRequest.send();
document.getElementById('message').textContent = 'Query: ' + query;
var table = document.getElementById('results');
var tr;
while (tr = table.lastChild) table.removeChild(tr);
return false; // don't submit form
}
return makeRequest;
}
function processResponse() {
if (httpRequest.readyState === 4) {
var query = document.getElementById('queryField').value;
if (httpRequest.status === 200) {
fillTable(httpRequest.responseText);
} else {
alert('query: ' + query + '\nstatus: '+httpRequest.status);
}
}
}
function getSymbols(string) {
// needed for iterating over Unicode characters after U+FFFF
var length = string.length;
var index = -1;
var output = [];
var character;
var charCode;
while (++index < length) {
character = string.charAt(index);
charCode = character.charCodeAt(0);
if (charCode >= 0xD800 && charCode <= 0xDBFF) {
output.push(character + string.charAt(++index));
} else {
output.push(character);
}
}
return output;
}
// from: https://developer.mozilla.org/...
// en-US/docs/Web/JavaScript/Reference/Global_Objects/String/charCodeAt
function knownCharCodeAt(str, idx) {
str += '';
var code,
end = str.length;
var surrogatePairs = /[\uD800-\uDBFF][\uDC00-\uDFFF]/g;
while ((surrogatePairs.exec(str)) != null) {
var li = surrogatePairs.lastIndex;
if (li - 2 < idx) {
idx++;
}
else {
break;
}
}
if (idx >= end || idx < 0) {
return NaN;
}
code = str.charCodeAt(idx);
var hi, low;
if (0xD800 <= code && code <= 0xDBFF) {
hi = code;
// Go one further, "characters" is part of a surrogate pair
low = str.charCodeAt(idx + 1);
return ((hi - 0xD800) * 0x400) + (low - 0xDC00) + 0x10000;
}
return code;
}
function codePointStr(uniChar) {
if (uniChar.length == 1) {
var code = uniChar.charCodeAt(0);
} else { // characters after U+FFFF
var code = knownCharCodeAt(uniChar, 0);
};
var codeStr = code.toString(16);
var padding = Array(Math.max(4 - codeStr.length + 1, 0)).join(0);
return 'U+' + padding + codeStr.toUpperCase();
}
function fillTable(responseData) {
var results = JSON.parse(responseData);
console.log(results);
var table = document.getElementById('results');
var tr;
var characters = getSymbols(results.chars);
for (var i=results.start; i < results.stop; i++) {
ch = characters[i];
if (ch == '\n') continue;
if (ch == '\x00') break;
var hexCode = codePointStr(ch);
tr = document.createElement('tr');
tr.appendChild(document.createElement('td'));
tr.appendChild(document.createElement('th'));
tr.cells[0].appendChild(document.createTextNode(hexCode));
tr.cells[1].appendChild(document.createTextNode(ch));
tr.id = hexCode;
table.appendChild(tr);
if (results.stop < results.total) {
setTimeout(requestMaker(results.stop)(), REQUEST_DELAY);
}
}
}
window.onload = function() {
var requester = requestMaker(0);
document.getElementById('queryForm').onsubmit = requester;
document.getElementById('queryButton').onclick = requester;
}
//})();
</script>
</head>
<body>
<p>
<form id="queryForm">
<input id="queryField" type="search" name="query" value="">
<input id="queryButton" type="button" value="find">
Examples: {links}
</form>
</p>
<p id="message">{message}</p>
<hr>
<table id="results">
</table>
</body>
</html>

View File

@ -60,6 +60,8 @@ import re
import unicodedata
import pickle
import warnings
import itertools
from collections import namedtuple
RE_WORD = re.compile('\w+')
RE_UNICODE_NAME = re.compile('^[A-Z0-9 -]+$')
@ -67,7 +69,8 @@ RE_CODEPOINT = re.compile('U\+([0-9A-F]{4,6})')
INDEX_NAME = 'charfinder_index.pickle'
MINIMUM_SAVE_LEN = 10000
CJK_PREFIX = 'CJK UNIFIED IDEOGRAPH'
CJK_UNI_PREFIX = 'CJK UNIFIED IDEOGRAPH'
CJK_CMP_PREFIX = 'CJK COMPATIBILITY IDEOGRAPH'
sample_chars = [
'$', # DOLLAR SIGN
@ -83,6 +86,7 @@ def tokenize(text):
for match in RE_WORD.finditer(text):
yield match.group().upper()
def query_type(text):
text_upper = text.upper()
if 'U+' in text_upper:
@ -92,6 +96,7 @@ def query_type(text):
else:
return 'CHARACTERS'
CharDescription = namedtuple('CharDescription', 'code_str char name')
class UnicodeNameIndex:
@ -128,12 +133,13 @@ class UnicodeNameIndex:
name = unicodedata.name(char)
except ValueError:
continue
if name.startswith(CJK_PREFIX):
name = CJK_PREFIX
code = ord(char)
if name.startswith(CJK_UNI_PREFIX):
name = CJK_UNI_PREFIX
elif name.startswith(CJK_CMP_PREFIX):
name = CJK_CMP_PREFIX
for word in tokenize(name):
index.setdefault(word, set()).add(code)
index.setdefault(word, set()).add(char)
self.index = index
@ -151,7 +157,8 @@ class UnicodeNameIndex:
for postings, key in self.word_rank(top):
print('{:5} {}'.format(postings, key))
def find_codes(self, query):
def find_chars(self, query, start=0, stop=None):
stop = sys.maxsize if stop is None else stop
result_sets = []
for word in tokenize(query):
if word in self.index:
@ -160,23 +167,30 @@ class UnicodeNameIndex:
result_sets = []
break
if result_sets:
result = result_sets[0]
result.intersection_update(*result_sets[1:])
else:
result = set()
if len(result) > 0:
for code in sorted(result):
yield code
result = result_sets[0].intersection(*result_sets[1:])
result = sorted(result) # must sort for consistency
for char in itertools.islice(result, start, stop):
yield char
def describe(self, code):
code_str = 'U+{:04X}'.format(code)
char = chr(code)
def find_codes(self, query, start=0, stop=None):
return (ord(char) for char
in self.find_chars(query, start, stop))
def describe(self, char):
code_str = 'U+{:04X}'.format(ord(char))
name = unicodedata.name(char)
return '{:7}\t{}\t{}'.format(code_str, char, name)
return CharDescription(code_str, char, name)
def find_descriptions(self, query):
for code in self.find_codes(query):
yield self.describe(code)
def find_descriptions(self, query, start=0, stop=None):
for char in self.find_chars(query, start, stop):
yield self.describe(char)
def describe_str(self, char):
return '{:7}\t{}\t{}'.format(*self.describe(char))
def find_description_strs(self, query, start=0, stop=None):
for char in self.find_chars(query, start, stop):
yield self.describe_str(char)
@staticmethod # not an instance method due to concurrency
def status(query, counter):
@ -192,7 +206,8 @@ class UnicodeNameIndex:
def main(*args):
index = UnicodeNameIndex()
query = ' '.join(args)
for n, line in enumerate(index.find_descriptions(query), 1):
n = 0
for n, line in enumerate(index.find_description_strs(query), 1):
print(line)
print('({})'.format(index.status(query, n)))

Binary file not shown.

View File

@ -23,19 +23,25 @@ PAGE_TPL = '''
</p>
<p>{message}</p>
<hr>
<pre>
<table>
{result}
</pre>
</table>
</body>
</html>
'''
CONTENT_TYPE = 'text/html; charset=UTF-8'
EXAMPLE_WORDS = ('chess cat circled Malayalam digit Roman face Ethiopic'
EXAMPLE_WORDS = ('bismillah chess cat circled Malayalam digit Roman face Ethiopic'
' black mark symbol dot operator Braille hexagram').split()
LINK_TPL = '<a href="/?query={0}" title="find &quot;{0}&quot;">{0}</a>'
LINKS_HTML = ', '.join(LINK_TPL.format(word)
for word in sorted(EXAMPLE_WORDS, key=str.upper))
ROW_TPL = '<tr><td>{code_str}</td><th>{char}</th><td>{name}</td></tr>'
CONTENT_TYPE = 'text/html; charset=UTF-8'
index = None # a UnicodeNameIndex instance
@ -44,19 +50,18 @@ def handle(request):
query = request.GET.get('query', '')
print('Query: {!r}'.format(query))
if query:
lines = list(index.find_descriptions(query))
res = '\n'.join(lines)
msg = index.status(query, len(lines))
descriptions = list(index.find_descriptions(query))
res = '\n'.join(ROW_TPL.format(**vars(descr))
for descr in descriptions)
msg = index.status(query, len(descriptions))
else:
lines = []
descriptions = []
res = ''
msg = 'Type words describing characters.'
links = ', '.join(LINK_TPL.format(word)
for word in sorted(EXAMPLE_WORDS, key=str.upper))
text = PAGE_TPL.format(query=query, result=res,
message=msg, links=links)
print('Sending {} results'.format(len(lines)))
message=msg, links=LINKS_HTML)
print('Sending {} results'.format(len(descriptions)))
return web.Response(content_type=CONTENT_TYPE, text=text)
@ -77,7 +82,7 @@ def main(address="127.0.0.1", port=8888):
loop.run_until_complete(init(loop, address, port))
loop.run_forever()
if __name__ == '__main__':
index = UnicodeNameIndex()
main(*sys.argv[1:])

View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
import sys
import asyncio
import urllib
import json
from aiohttp import web
from charfinder import UnicodeNameIndex
PAGE_TPL = '''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Charserver</title>
<script type="text/javascript">
function onclick() {
var table = document.getElementById("results");
for (var char in "ABCDE") {
code = char.charCodeAt(0);
var tr = document.createElement('tr');
tr.appendChild(document.createElement('td'));
tr.appendChild(document.createElement('th'));
var code_str = 'U+'+code.toString(16);
tr.cells[0].appendChild(document.createTextNode(code_str));
tr.cells[1].appendChild(document.createTextNode(char));
}
}
</script>
</head>
<body>
<p>
<form action="/">
<input type="search" name="query" value="">
<input type="submit" value="find" onclick="fillTable()">
Examples: {links}
</form>
</p>
<p>{message}</p>
<hr>
<table id="results">
</table>
</body>
</html>
'''
EXAMPLE_WORDS = ('bismillah chess cat circled Malayalam digit Roman face Ethiopic'
' black mark symbol dot operator Braille hexagram').split()
LINK_TPL = '<a href="/?query={0}" title="find &quot;{0}&quot;">{0}</a>'
LINKS_HTML = ', '.join(LINK_TPL.format(word)
for word in sorted(EXAMPLE_WORDS, key=str.upper))
ROW_TPL = '<tr id="{code_str}"><td>{code_str}</td><th>{char}</th><td>{name}</td></tr>'
HTML_TYPE = 'text/html; charset=UTF-8'
TEXT_TYPE = 'text/plain; charset=UTF-8'
RESULTS_PER_REQUEST = 15
index = None # a UnicodeNameIndex instance
@asyncio.coroutine
def form(request):
peername = request.transport.get_extra_info('peername')
print('Request from: {}, query: {!r}'.format(peername, request.path_qs))
msg = 'Type words describing characters.'
text = PAGE_TPL.format(message=msg, links=LINKS_HTML)
return web.Response(content_type=HTML_TYPE, text=text)
@asyncio.coroutine
def get_chars(request):
peername = request.transport.get_extra_info('peername')
query = request.GET.get('query', '')
limit = request.GET.get('query', 0)
print('Request from: {}, GET data: {!r}'.format(peername, dict(request.GET)))
if query:
try:
start = int(request.GET.get('start', 0))
stop = int(request.GET.get('stop', sys.maxsize))
except ValueError:
raise web.HTTPBadRequest()
stop = min(stop, start+RESULTS_PER_REQUEST)
chars = list(index.find_chars(query, start, stop))
else:
chars = []
start = 0
stop = 0
num_results = len(chars)
text = ''.join(char if n % 64 else char+'\n'
for n, char in enumerate(chars, 1))
response_data = {'total': num_results, 'start': start, 'stop': stop}
print('Response to query: {query!r}, start: {start}, stop: {stop}'.format(
query=query, **response_data))
response_data['chars'] = text
json_obj = json.dumps(response_data)
print('Sending {} results'.format(num_results))
headers = {'Access-Control-Allow-Origin': '*'}
return web.Response(content_type=TEXT_TYPE, headers=headers, text=json_obj)
@asyncio.coroutine
def init(loop, address, port):
app = web.Application(loop=loop)
app.router.add_route('GET', '/chars', get_chars)
app.router.add_route('GET', '/', form)
server = yield from loop.create_server(app.make_handler(),
address, port)
host = server.sockets[0].getsockname()
print('Serving on {}. Hit CTRL-C to stop.'.format(host))
def main(address="127.0.0.1", port=8888):
port = int(port)
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop, address, port))
try:
loop.run_forever()
except KeyboardInterrupt:
print('Stopped.')
if __name__ == '__main__':
index = UnicodeNameIndex()
main(*sys.argv[1:])

View File

@ -26,11 +26,11 @@ def handle_queries(reader, writer):
if query:
if ord(query[:1]) < 32:
break
lines = list(index.find_descriptions(query))
lines = list(index.find_description_strs(query))
if lines:
writer.writelines(line.encode() + CRLF for line in lines)
writer.write(index.status(query, len(lines)).encode() + CRLF)
yield from writer.drain()
print('Sent {} results'.format(len(lines)))

View File

@ -42,6 +42,11 @@ def test_find_word_1_match(sample_index):
assert res == [(8352, 'EURO-CURRENCY SIGN')]
def test_find_word_1_match_character_result(sample_index):
res = [name(char) for char in sample_index.find_chars('currency')]
assert res == ['EURO-CURRENCY SIGN']
def test_find_word_2_matches(sample_index):
res = [(code, name(chr(code)))
for code in sample_index.find_codes('Euro')]
@ -88,3 +93,22 @@ def test_find_1_word_2_matches_full(full_index):
def test_find_3_words_no_matches_full(full_index):
res = list(full_index.find_codes('no such character'))
assert len(res) == 0
def test_find_with_start(sample_index):
res = [(code, name(chr(code)))
for code in sample_index.find_codes('sign', 1)]
assert res == [(8352, 'EURO-CURRENCY SIGN'), (8364, 'EURO SIGN')]
def test_find_with_stop(sample_index):
res = [(code, name(chr(code)))
for code in sample_index.find_codes('sign', 0, 2)]
assert res == [(36, 'DOLLAR SIGN'), (8352, 'EURO-CURRENCY SIGN')]
def test_find_with_start_stop(sample_index):
res = [(code, name(chr(code)))
for code in sample_index.find_codes('sign', 1, 2)]
assert res == [(8352, 'EURO-CURRENCY SIGN')]

View File

@ -0,0 +1,184 @@
"""
Wikipedia Picture of the Day (POTD) download example
Note:
The earliest Pictures of the Day I've found are in this page:
http://en.wikipedia.org/wiki/Wikipedia:Picture_of_the_day/May_2004
However, I have not found Template:POTD/YYYY-MM-DD pages earlier
than this:
http://en.wikipedia.org/wiki/Template:POTD/2007-01-01
For simplicity, this script only retrieves pictures starting
from 2007-01-01.
"""
import sys
import argparse
import re
import imghdr
import time
import datetime
import requests
SAVE_DIR = 'pictures/'
POTD_BASE_URL = 'http://en.wikipedia.org/wiki/Template:POTD/'
POTD_IMAGE_RE = re.compile(r'src="(//upload\..*?)"')
PODT_EARLIEST_TEMPLATE = '2007-01-01'
RE_YEAR = r'([12]\d{3})'
RE_MONTH = RE_YEAR + r'-([01]\d)'
RE_DATE = RE_MONTH + r'-([0-3]\d)'
ISO_DATE_FMT = '%Y-%m-%d'
DATEFORMS = [
('date', re.compile('^' + RE_DATE + '$')),
('month', re.compile('^' + RE_MONTH + '$')),
('year', re.compile('^' + RE_YEAR + '$'))
]
class NoPictureForDate(Exception):
'''No Picture of the Day found for {iso_date}'''
class NoPictureTemplateBefore(ValueError):
'''Template:POTD did not exist before PODT_EARLIEST_TEMPLATE'''
def get_picture_url(iso_date):
page_url = POTD_BASE_URL+iso_date
response = requests.get(page_url)
pict_url = POTD_IMAGE_RE.search(response.text)
if pict_url is None:
raise NoPictureForDate(iso_date)
return 'http:' + pict_url.group(1)
def get_picture(iso_date):
pict_url = get_picture_url(iso_date)
response = requests.get(pict_url)
octets = response.content
return octets
def get_picture_type(octets):
pict_type = imghdr.what(None, octets)
if pict_type is None:
if (octets.startswith(b'<') and
b'<svg' in octets[:200] and
octets.rstrip().endswith(b'</svg>')):
pict_type = 'svg'
return pict_type
def validate_date(text):
try:
parts = [int(part) for part in text.split('-')]
except ValueError:
raise ValueError('date must use YYYY, YYYY-MM or YYYY-MM-DD format')
test_parts = parts[:]
while len(test_parts) < 3:
test_parts.append(1)
date = datetime.datetime(*(int(part) for part in test_parts))
iso_date = date.strftime(ISO_DATE_FMT)
iso_date = iso_date[:1+len(parts)*3]
if iso_date < PODT_EARLIEST_TEMPLATE:
raise NoPictureTemplateBefore(PODT_EARLIEST_TEMPLATE)
return iso_date
def gen_month_dates(iso_month):
first = datetime.datetime.strptime(iso_month+'-01', ISO_DATE_FMT)
one_day = datetime.timedelta(days=1)
date = first
while date.month == first.month:
yield date.strftime(ISO_DATE_FMT)
date += one_day
def gen_year_dates(iso_year):
for i in range(1, 13):
yield from gen_month_dates(iso_year + '-{:02d}'.format(i))
def gen_dates(iso_parts):
if len(iso_parts) == 4:
yield from gen_year_dates(iso_parts)
elif len(iso_parts) == 7:
yield from gen_month_dates(iso_parts)
else:
yield iso_parts
def parse_args(argv):
parser = argparse.ArgumentParser(description=main.__doc__)
date_help = 'YYYY-MM-DD or YYYY-MM or YYYY: year, month and day'
parser.add_argument('date', help=date_help)
parser.add_argument('-q', '--max_qty', type=int,
help='maximum number of items to fetch')
parser.add_argument('-u', '--url_only', action='store_true',
help='get picture URLS only')
parser.add_argument('-v', '--verbose', action='store_true',
help='display progress information')
args = parser.parse_args(argv)
try:
iso_parts = validate_date(args.date)
except ValueError as exc:
print('error:', exc.args[0])
parser.print_usage()
sys.exit(2)
dates = list(gen_dates(iso_parts))
if args.verbose:
if len(dates) == 1:
print('-> Date: ', dates[0])
else:
fmt = '-> {} days: {}...{}'
print(fmt.format(len(dates), dates[0], dates[-1]))
return dates, args
def get_picture_urls(dates, verbose=False):
urls = []
count = 0
for date in dates:
try:
url = get_picture_url(date)
except NoPictureForDate as exc:
if verbose:
print('*** {!r} ***'.format(exc))
continue
count += 1
if verbose:
print(format(count, '3d'), end=' ')
print(url.split('/')[-1])
else:
print(url)
urls.append(url)
return urls
def main(argv, get_picture_urls):
"""Get Wikipedia "Picture of The Day" for date, month or year"""
dates, args = parse_args(argv)
t0 = time.time()
urls = get_picture_urls(dates, args.verbose)
elapsed = time.time() - t0
if args.verbose:
print('-> found: {} pictures | elapsed time: {:.2f}s'
.format(len(urls), elapsed))
if __name__ == '__main__':
main(sys.argv[1:], get_picture_urls)

View File

@ -0,0 +1,61 @@
"""
Wikipedia Picture of the Day (POTD) download example
"""
import sys
import asyncio
import aiohttp
from daypicts import main
from daypicts import NoPictureForDate
from daypicts import POTD_BASE_URL
from daypicts import POTD_IMAGE_RE
GLOBAL_TIMEOUT = 300 # seconds
@asyncio.coroutine
def get_picture_url(iso_date):
page_url = POTD_BASE_URL+iso_date
response = yield from aiohttp.request('GET', page_url)
text = yield from response.text()
pict_url = POTD_IMAGE_RE.search(text)
if pict_url is None:
raise NoPictureForDate(iso_date)
return 'http:' + pict_url.group(1)
@asyncio.coroutine
def get_picture_urls(dates, verbose=False):
tasks = [get_picture_url(date) for date in dates]
urls = []
count = 0
# get results as jobs are done
for job in asyncio.as_completed(tasks, timeout=GLOBAL_TIMEOUT):
try:
url = yield from job
except NoPictureForDate as exc:
if verbose:
print('*** {!r} ***'.format(exc))
continue
except aiohttp.ClientResponseError as exc:
print('****** {!r} ******'.format(exc))
continue
count += 1
if verbose:
print(format(count, '3d'), end=' ')
print(url.split('/')[-1])
else:
print(url)
urls.append(url)
return urls
def run_loop(dates, verbose=False):
loop = asyncio.get_event_loop()
return loop.run_until_complete(get_picture_urls(dates, verbose))
if __name__ == '__main__':
main(sys.argv[1:], run_loop)

View File

@ -0,0 +1,45 @@
"""
Wikipedia Picture of the Day (POTD) download example
"""
import sys
from concurrent import futures
from daypicts import main, get_picture_url, NoPictureForDate
MAX_NUM_THREADS = 400
GLOBAL_TIMEOUT = 300 # seconds
def get_picture_urls(dates, verbose=False):
num_threads = min(len(dates), MAX_NUM_THREADS)
pool = futures.ThreadPoolExecutor(num_threads)
pending = {}
for date in dates:
job = pool.submit(get_picture_url, date)
pending[job] = date
urls = []
count = 0
# get results as jobs are done
for job in futures.as_completed(pending, timeout=GLOBAL_TIMEOUT):
try:
url = job.result()
except NoPictureForDate as exc:
if verbose:
print('*** {!r} ***'.format(exc))
continue
count += 1
if verbose:
print(format(count, '3d'), end=' ')
print(url.split('/')[-1])
else:
print(url)
urls.append(url)
return urls
if __name__ == '__main__':
main(sys.argv[1:], get_picture_urls)

View File

@ -0,0 +1,4 @@
#!/bin/bash
# run tests skipping @pytest.mark.network
py.test test_daypicts.py -m 'not network' $1 $2 $3

View File

@ -0,0 +1,36 @@
"""
Wikipedia Picture of the Day (POTD) download example
Inspired by example at:
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example
"""
from concurrent import futures
import potd
def save_month(year_month, verbose):
year, month = [int(s) for s in year_month.split('-')]
total_size = 0
img_count = 0
dates = potd.list_days_of_month(year, month)
with futures.ProcessPoolExecutor(max_workers=100) as executor:
downloads = dict((executor.submit(potd.save_one, date, verbose), date)
for date in dates)
for future in futures.as_completed(downloads):
date = downloads[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (date,
future.exception()))
else:
img_size = future.result()
total_size += img_size
img_count += 1
print('%r OK: %r' % (date, img_size))
return img_count, total_size
if __name__ == '__main__':
potd.main(save_month=save_month)

View File

@ -0,0 +1,36 @@
"""
Wikipedia Picture of the Day (POTD) download example
Inspired by example at:
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example
"""
from concurrent import futures
import potd
def save_month(year_month, verbose):
year, month = [int(s) for s in year_month.split('-')]
total_size = 0
img_count = 0
dates = potd.list_days_of_month(year, month)
with futures.ThreadPoolExecutor(max_workers=100) as executor:
downloads = dict((executor.submit(potd.save_one, date, verbose), date)
for date in dates)
for future in futures.as_completed(downloads):
date = downloads[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (date,
future.exception()))
else:
img_size = future.result()
total_size += img_size
img_count += 1
print('%r OK: %r' % (date, img_size))
return img_count, total_size
if __name__ == '__main__':
potd.main(save_month=save_month)

View File

@ -0,0 +1,100 @@
"""
Wikipedia Picture of the Day (POTD) download example
Baseline synchronous example for comparison: downloads metadata and
images in the simple but slow synchronous way i.e. one after the other.
"""
import calendar
import datetime
import re
import os
import io
import time
import requests
import argparse
SAVE_DIR = 'pictures/'
POTD_BASE_URL = 'http://en.wikipedia.org/wiki/Template:POTD/'
class NoPictureForDate(Exception):
'''No Picture of the Day found for {day}'''
def build_page_url(iso_date):
return POTD_BASE_URL + iso_date
def fetch(url):
response = requests.get(url)
return response
def extract_image_url(html):
re_image = r'src="(//upload\..*?)"'
image_url = re.search(re_image, html)
return 'http:' + image_url.group(1)
def format_date(year, month, day):
return '{year}-{month:02d}-{day:02d}'.format(**locals())
def list_days_of_month(year, month):
lastday = calendar.monthrange(year, month)[1]
days = [format_date(year, month, day) for day in range(1, lastday + 1)]
return days
def build_save_path(iso_date, url):
head, filename = os.path.split(url)
return os.path.join(SAVE_DIR, iso_date+'_'+filename)
def save_one(iso_date, verbose):
page_url = build_page_url(iso_date)
response = fetch(page_url)
if response.status_code != 200:
msg = NoPictureForDate.__doc__.format(day=iso_date)
raise NoPictureForDate(msg)
img_url = extract_image_url(response.text)
response = fetch(img_url)
path = build_save_path(iso_date, img_url)
if verbose:
print('saving: '+path)
with io.open(path, 'wb') as fp:
fp.write(response.content)
return len(response.content)
def save_month(year_month, verbose):
year, month = [int(s) for s in year_month.split('-')]
total_size = 0
img_count = 0
dates = list_days_of_month(year, month)
for date in dates:
try:
total_size += save_one(date, verbose)
img_count += 1
except NoPictureForDate:
continue
return img_count, total_size
def main(save_one=save_one, save_month=save_month):
"""Get "Picture of The Day" from English Wikipedia for a given date or month"""
parser = argparse.ArgumentParser(description=main.__doc__)
parser.add_argument('date', help='year, month and (optional) day in YYYY-MM-DD format')
parser.add_argument('-q', '--max_qty', type=int,
help='maximum number of files to download')
parser.add_argument('-v', '--verbose', action='store_true',
help='display progress information')
args = parser.parse_args()
t0 = time.time()
if len(args.date) == len('YYYY-MM-DD'):
img_count = 1
total_size = save_one(args.date, args.verbose)
else:
img_count, total_size = save_month(args.date, args.verbose)
elapsed = time.time() - t0
print("images: %3d | total size: %6.1f Kbytes | elapsed time: %3ds" %
(img_count, total_size/1024.0, elapsed))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,96 @@
import unittest
import potd
class TestSequenceFunctions(unittest.TestCase):
def setUp(self):
self.thumb_url = ("""http://upload.wikimedia.org/wikipedia/"""
"""commons/thumb/f/fe/Orthographic_projection_SW.jpg/350px"""
"""-Orthographic_projection_SW.jpg""")
def test_buid_page_url(self):
date = '2014-05-01'
result = potd.build_page_url(date)
self.assertEqual(result, 'http://en.wikipedia.org/wiki/Template:POTD/2014-05-01')
def test_fetch_status_code(self):
date = '2014-05-02'
url = potd.build_page_url(date)
response = potd.fetch(url)
self.assertEqual(response.status_code, 200)
def test_fetch_status_code_not_found(self):
date = '2100-01-01'
url = potd.build_page_url(date)
response = potd.fetch(url)
self.assertEqual(response.status_code, 404)
def test_extract_image_url(self):
image_url = potd.extract_image_url(HTML)
self.assertEqual(image_url, self.thumb_url)
def test_fetch_image_jpeg(self):
response = potd.fetch(self.thumb_url)
self.assertEqual(response.headers['content-type'], 'image/jpeg')
def test_list_days_of_month(self):
year = 2014
month = 5
days = potd.list_days_of_month(year, month)
self.assertEqual(len(days), 31)
self.assertEqual('2014-05-01', days[0])
self.assertEqual('2014-05-31', days[-1])
def test_list_days_of_february(self):
year = 2014
month = 2
days = potd.list_days_of_month(year, month)
self.assertEqual(len(days), 28)
self.assertEqual('2014-02-01', days[0])
self.assertEqual('2014-02-28', days[-1])
def test_format_date(self):
year = 2014
month = 2
day = 1
a_date = '2014-02-01'
date = potd.format_date(year, month, day)
self.assertEqual(a_date, date)
self.assertEqual(potd.format_date(2010, 11, 12), '2010-11-12')
def test_build_save_path(self):
date = '2014-06-04'
path = potd.SAVE_DIR + date + '_350px-Orthographic_projection_SW.jpg'
self.assertEqual(path, potd.build_save_path(date, self.thumb_url))
HTML = (
'''<td><a href="/wiki/File:Orthographic_projection_SW.jpg" class="image"
title="Orthographic projection"><img alt="Orthographic projection"
src="//upload.wikimedia.org/wikipedia/commons/thumb/f/fe/O'''
'''rthographic_projection_SW.jpg/350px-Orthographic_projection_SW.jpg"
width="350" height="350" srcset="//upload.wikimedia.org/wikipedia/comm'''
'''ons/thumb/f/fe/Orthographic_projection_SW.jpg/525px-
Orthographic_projection_SW.jpg 1.5x, //upload.wikimedia.org/wikipedia/
commons/thumb/f/fe/Orthographic_projection_SW.jpg/700px-
Orthographic_projection_SW.jpg 2x" data-file-width="2058" data-file-
height="2058"></a></td>
''')
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,115 @@
"""
Wikipedia Picture of the Day (POTD) download example
Baseline synchronous example for comparison: downloads images and metadata
in the simple but slow synchronous way i.e. one after the other.
"""
from __future__ import print_function
import sys
import os
import io
import re
import argparse
import datetime
import urllib2
import contextlib
import time
POTD_BASE_URL = 'http://en.wikipedia.org/wiki/Template:POTD/'
THUMB_BASE_URL = 'http://upload.wikimedia.org/wikipedia/commons/thumb/'
THUMB_SRC_RE = re.compile(r'src=".*?/thumb/(.*?/\d+px-[^"]+)')
LOCAL_IMG_PATH = 'pictures/'
verbose = True
class ParsingException(ValueError):
"""Raised if unable to parse POTD MediaWiki source"""
def fetch_potd_url(iso_date):
"""Fetch picture name from iso_date ('YYYY-MM-DD' format)"""
potd_url = POTD_BASE_URL + iso_date
with contextlib.closing(urllib2.urlopen(potd_url)) as fp:
html = fp.read()
thumb_src = THUMB_SRC_RE.search(html)
if not thumb_src:
msg = 'cannot find thumbnail source for ' + potd_url
raise ParsingException(msg)
thumb_url = THUMB_BASE_URL+thumb_src.group(1)
return thumb_url
def gen_month_days(year, month):
a_date = datetime.date(year, month, 1)
one_day = datetime.timedelta(1)
while a_date.month == month:
yield a_date
a_date += one_day
def get_img_names(iso_month):
"""Fetch picture names from iso_month ('YYYY-MM' format)"""
year, month = (int(part) for part in iso_month.split('-'))
for day in gen_month_days(year, month):
iso_date = '{:%Y-%m-%d}'.format(day)
if verbose:
print(iso_date)
try:
img_url = fetch_potd_url(iso_date)
except urllib2.HTTPError:
break
yield (iso_date, img_url)
def fetch_image(iso_date, img_url):
if verbose:
print('\t' + img_url)
with contextlib.closing(urllib2.urlopen(img_url)) as fp:
img = fp.read()
img_filename = iso_date + '__' + img_url.split('/')[-1]
if verbose:
print('\t\twriting %0.1f Kbytes' % (len(img)/1024.0))
img_path = os.path.join(LOCAL_IMG_PATH, img_filename)
with io.open(img_path, 'wb') as fp:
fp.write(img)
return len(img)
def get_images(iso_month, max_count=0):
if max_count is 0:
max_count = sys.maxsize
img_count = 0
total_size = 0
for iso_date, img_url in get_img_names(iso_month):
total_size += fetch_image(iso_date, img_url)
img_count += 1
if img_count == max_count:
break
return (img_count, total_size)
def main():
"""Get "Pictures of The Day" from English Wikipedia for a given month"""
global verbose
parser = argparse.ArgumentParser(description=main.__doc__)
parser.add_argument('year_month', help='year and month in YYYY-MM format')
parser.add_argument('-q', '--max_qty', type=int,
help='maximum number of files to download')
parser.add_argument('-v', '--verbose', action='store_true',
help='display progress information')
args = parser.parse_args()
verbose = args.verbose
t0 = time.time()
img_count, total_size = get_images(args.year_month, args.max_qty)
elapsed = time.time() - t0
print("images: %3d | total size: %6.1f Kbytes | elapsed time: %3ds" %
(img_count, total_size/1024.0, elapsed))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,118 @@
"""
Wikipedia Picture of the Day (POTD) download example
Baseline synchronous example for comparison: downloads images and metadata
in the simple but slow synchronous way i.e. one after the other.
"""
import sys
import os
import io
import re
import argparse
import datetime
import urllib.request
import urllib.error
import contextlib
import time
POTD_BASE_URL = 'http://en.wikipedia.org/wiki/Template:POTD/'
THUMB_BASE_URL = 'http://upload.wikimedia.org/wikipedia/commons/thumb/'
THUMB_SRC_RE = re.compile(r'src=".*?/thumb/(.*?/\d+px-[^"]+)')
LOCAL_IMG_PATH = 'pictures/'
verbose = True
class ParsingException(ValueError):
"""Raised if unable to parse POTD MediaWiki source"""
def gen_month_dates(year, month):
"""Produce all dates in a given year, month"""
a_date = datetime.date(year, month, 1)
one_day = datetime.timedelta(1)
while a_date.month == month:
yield '{:%Y-%m-%d}'.format(a_date)
a_date += one_day
def fetch_potd_url(iso_date):
"""Fetch POTD thumbnail URL for iso_date ('YYYY-MM-DD' format)"""
if verbose:
print(iso_date)
potd_url = POTD_BASE_URL + iso_date
try:
with urllib.request.urlopen(potd_url) as fp:
html = fp.read().decode('utf-8')
thumb_src = THUMB_SRC_RE.search(html)
if not thumb_src:
msg = 'cannot find thumbnail source for ' + potd_url
raise ParsingException(msg)
thumb_url = THUMB_BASE_URL+thumb_src.group(1)
except urllib.error.HTTPError:
return None
return thumb_url
def gen_img_names(iso_month):
"""Produce picture names by fetching POTD metadata"""
year, month = (int(part) for part in iso_month.split('-'))
for iso_date in gen_month_dates(year, month):
img_url = fetch_potd_url(iso_date)
if img_url is None:
break
yield (iso_date, img_url)
def fetch_image(iso_date, img_url):
"""Fetch and save image data for date and url"""
if verbose:
print('\t' + img_url)
with contextlib.closing(urllib.request.urlopen(img_url)) as fp:
img = fp.read()
img_filename = iso_date + '__' + img_url.split('/')[-1]
if verbose:
print('\t\twriting %0.1f Kbytes' % (len(img)/1024.0))
img_path = os.path.join(LOCAL_IMG_PATH, img_filename)
with io.open(img_path, 'wb') as fp:
fp.write(img)
return len(img)
def get_images(iso_month, max_count=0):
"""Download up to max_count images for a given month"""
if max_count is 0:
max_count = sys.maxsize
img_count = 0
total_size = 0
for iso_date, img_url in gen_img_names(iso_month):
total_size += fetch_image(iso_date, img_url)
img_count += 1
if img_count == max_count:
break
return (img_count, total_size)
def main():
"""Get "Pictures of The Day" from English Wikipedia for a given month"""
global verbose
parser = argparse.ArgumentParser(description=main.__doc__)
parser.add_argument('year_month', help='year and month in YYYY-MM format')
parser.add_argument('-q', '--max_qty', type=int,
help='maximum number of files to download')
parser.add_argument('-v', '--verbose', action='store_true',
help='display progress information')
args = parser.parse_args()
verbose = args.verbose
t0 = time.time()
img_count, total_size = get_images(args.year_month, args.max_qty)
elapsed = time.time() - t0
print("images: %3d | total size: %6.1f Kbytes | elapsed time: %3ds" %
(img_count, total_size/1024.0, elapsed))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,87 @@
"""
Wikipedia Picture of the Day (POTD) download example
"""
import pytest
from daypicts import *
GIF_MIN = (b'GIF89a\x01\x00\x01\x00\x00\xff\x00,\x00\x00'
b'\x00\x00\x01\x00\x01\x00\x00\x02\x00;')
SVG_MIN = b'<svg xmlns="http://www.w3.org/2000/svg"></svg>'
SVG_XML_DECL = b'<?xml version="1.0" encoding="UTF-8"?>' + SVG_MIN
NOISE = b'\xb0\x0bU\xbe]L\n\x92\xbe\xc6\xf65"\xcc\xa3\xe3'
@pytest.mark.network
def test_get_picture_url_existing():
url = get_picture_url('2012-01-01')
expected = ('http://upload.wikimedia.org/wikipedia/commons/'
'thumb/9/9d/MODIS_Map.jpg/550px-MODIS_Map.jpg')
assert url == expected
@pytest.mark.network
def test_get_picture_url_not_existing():
with pytest.raises(NoPictureForDate):
get_picture_url('2013-09-12')
def test_get_picture_type_imghdr():
assert get_picture_type(GIF_MIN) == 'gif'
def test_get_picture_type_svg():
assert get_picture_type(SVG_MIN) == 'svg'
assert get_picture_type(SVG_XML_DECL) == 'svg'
def test_get_picture_type_unknown():
assert get_picture_type(NOISE) is None
def test_validate_full_date():
parts = validate_date('2015-1-2')
assert parts == '2015-01-02'
def test_validate_date_too_early():
with pytest.raises(NoPictureTemplateBefore):
validate_date('2006-12-31')
def test_validate_month():
parts = validate_date('2015-1')
assert parts == '2015-01'
def test_validate_year():
parts = validate_date('2015')
assert parts == '2015'
def test_gen_month_dates():
dates = list(gen_month_dates('2015-02'))
assert len(dates) == 28
assert dates[0] == '2015-02-01'
assert dates[27] == '2015-02-28'
def test_gen_month_dates_leap():
dates = list(gen_month_dates('2012-02'))
assert len(dates) == 29
assert dates[28] == '2012-02-29'
def test_gen_year_dates():
dates = list(gen_year_dates('2015'))
assert len(dates) == 365
assert dates[0] == '2015-01-01'
assert dates[364] == '2015-12-31'
def test_gen_year_dates_leap():
dates = list(gen_year_dates('2012'))
assert len(dates) == 366
assert dates[365] == '2012-12-31'

View File

@ -0,0 +1,87 @@
"""
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(10)
10
>>> adder.send(20)
30
>>> adder.send(30)
60
>>> try:
... next(adder)
... except StopIteration as exc:
... result = exc.value
...
>>> result
Result(sum=60, terms=3, average=20.0)
Closing a coroutine:
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(1)
1
>>> adder.send(10)
11
>>> adder.close()
>>> try:
... next(adder)
... except StopIteration as exc:
... exc.value is None
...
True
"""
import sys
import collections
Result = collections.namedtuple('Result', 'sum terms average')
def adder_coro(initial=0):
total = initial
num_terms = 0
while True:
try:
term = yield total
except GeneratorExit:
break
if term is None:
break
total += term
num_terms += 1
return Result(total, num_terms, total/num_terms)
def prompt():
while True:
try:
term = float(input('+ '))
except ValueError:
break
yield term
def main(get_terms):
adder = adder_coro()
next(adder)
for term in get_terms:
adder.send(term)
try:
next(adder)
except StopIteration as exc:
result = exc.value
print(result)
if __name__ == '__main__':
if len(sys.argv) > 1:
get_terms = (float(n) for n in sys.argv[1:])
else:
get_terms = prompt()
main(get_terms)

View File

@ -0,0 +1,96 @@
"""
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(10)
10
>>> adder.send(20)
30
>>> adder.send(30)
60
>>> try:
... next(adder)
... except StopIteration as exc:
... result = exc.value
...
>>> result
Result(sum=60, terms=3, average=20.0)
Closing a coroutine:
>>> adder = adder_coro()
>>> next(adder)
0
>>> adder.send(1)
1
>>> adder.send(10)
11
>>> adder.close()
>>> try:
... next(adder)
... except StopIteration as exc:
... exc.value is None
...
True
"""
import sys
import collections
def coroutine(func):
def primed_coroutine(*args, **kwargs):
coro = func(*args, **kwargs)
next(coro)
return coro
return primed_coroutine
Result = collections.namedtuple('Result', 'sum terms average')
@coroutine
def adder_coro(initial=0):
total = initial
num_terms = 0
while True:
try:
term = yield total
except GeneratorExit:
break
if term is None:
break
total += term
num_terms += 1
return Result(total, num_terms, total/num_terms)
def prompt():
while True:
try:
term = float(input('+ '))
except ValueError:
break
yield term
def main(get_terms):
adder = adder_coro()
for term in get_terms:
adder.send(term)
try:
adder.send(None)
except StopIteration as exc:
result = exc.value
print(result)
if __name__ == '__main__':
if len(sys.argv) > 1:
get_terms = (float(n) for n in sys.argv[1:])
else:
get_terms = prompt()
main(get_terms)

39
control/adder/soma.py Normal file
View File

@ -0,0 +1,39 @@
if 'raw_input' in dir(__builtins__):
input = raw_input # para funcionar com Python 2
def ler_num():
num = input('+: ')
try:
num = float(num)
except ValueError:
return 0
return num
def somadora():
qt_parcelas = 0
total = 0
try:
while True:
parcela = yield
qt_parcelas += 1
total += parcela
print('parcelas: %d total: %d' % (qt_parcelas, total))
finally:
print('parcelas: %d total: %d media: %d' % (qt_parcelas, total, total/qt_parcelas))
def main():
coro = somadora()
next(coro)
while True:
item = ler_num()
if item:
coro.send(item)
else:
print('Fechando corotina...')
coro.close()
break
if __name__=='__main__':
main()

View File

@ -0,0 +1,47 @@
if 'raw_input' in dir(__builtins__):
input = raw_input # para funcionar com Python 2
def ler_parcela():
parcela = input('+: ')
try:
parcela = float(parcela)
except ValueError:
return 0
return parcela
# decorator
def coro(func):
def start(*args, **kwargs):
g = func(*args, **kwargs)
next(g)
return g
return start
@coro
def somadora():
qt_parcelas = 0
total = 0
try:
while True:
parcela = yield
qt_parcelas += 1
total += parcela
print('parcelas: %d total: %d' % (qt_parcelas, total))
finally:
print('parcelas: %d total: %d media: %d' % (qt_parcelas, total, total/qt_parcelas))
def main():
coro = somadora()
while True:
parcela = ler_parcela()
if parcela:
coro.send(parcela)
else:
print('Fechando corotina...')
coro.close()
break
if __name__=='__main__':
main()

13
control/coro_demo.rst Normal file
View File

@ -0,0 +1,13 @@
>>> def coroutine():
... print('coroutine started')
... x = yield
... print('coroutine received: {!r}'.format(x))
...
>>> coro = coroutine()
>>> next(coro)
coroutine started
>>> coro.send(42)
coroutine received: 42
Traceback (most recent call last):
...
StopIteration

View File

@ -0,0 +1,13 @@
>>> def coroutine():
... print('coroutine started')
... x = yield
... print('coroutine received: {!r}'.format(x))
...
>>> coro = coroutine()
>>> next(coro)
coroutine started
>>> coro.send(42)
coroutine received: 42
Traceback (most recent call last):
...
StopIteration

24
control/demo_coro.py Normal file
View File

@ -0,0 +1,24 @@
>>> def coro():
... print 'iniciando corotina...'
... while True:
... x = yield
... print 'recebido: ', x
... if x == -1: break
... print 'terminando corotina.'
...
>>> c = coro()
>>> next(c)
iniciando corotina...
>>> c.send(7)
recebido: 7
>>> c.send(3)
recebido: 3
>>> c.send(10)
recebido: 10
>>> c.send(-1)
recebido: -1
terminando corotina.
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>>

25
control/exemplo0.py Normal file
View File

@ -0,0 +1,25 @@
def corrotina():
print('\t(corrotina) inciciando...')
x = yield
print('\t(corrotina) recebeu x: %r' % x)
y = yield
print('\t(corrotina) recebeu y: %r' % y)
print('\t(corrotina) terminando.')
def principal():
print('(principal) iniciando...')
co = corrotina()
print('(principal) invocando next(co)...')
next(co)
print('(principal) invocando co.send(88)...')
co.send(88)
try:
print('(principal) invocando co.send(99)...')
co.send(99)
# o print a seguir nunca vai acontecer
print('(principal) invocado co.send(99)')
except StopIteration:
print('(principal) a corotina nao tem mais valores a produzir')
principal()

27
control/exemplo1.py Normal file
View File

@ -0,0 +1,27 @@
def corrotina():
print('\t(corrotina) inciciando...')
x = yield 1
print('\t(corrotina) recebeu x: %r' % x)
y = yield 2
print('\t(corrotina) recebeu y: %r' % y)
print('\t(corrotina) terminando.')
def principal():
print('(principal) iniciando...')
co = corrotina()
print('(principal) invocando next(co)...')
res = next(co)
print('(principal) produzido por next(co): %r' % res)
print('(principal) invocando co.send(88)...')
res2 = co.send(88)
print('(principal) produzido por co.send(88): %r' % res2)
try:
print('(principal) invocando co.send(99)...')
res3 = co.send(99)
# o print a seguir nunca vai acontecer
print('(principal) produzido por co.send(99): %r' % res3)
except StopIteration:
print('(principal) a corotina nao tem mais valores a produzir')
principal()

26
control/guido/guido0.py Normal file
View File

@ -0,0 +1,26 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal(ger1())
OK
42
Visualização no PythonTutor: http://goo.gl/FQWq2F
"""
def ger1():
val = yield 'OK'
print(val)
yield # para evitar o StopIteration
def principal(g):
print(next(g))
g.send(42)
# auto-teste
import doctest
doctest.testmod()

29
control/guido/guido1.py Normal file
View File

@ -0,0 +1,29 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal(ger2())
OK
42
Visualização no PythonTutor: http://goo.gl/pWrlkm
"""
def ger1():
val = yield 'OK'
print(val)
yield # para evitar o StopIteration
def ger2():
yield from ger1()
def principal(g):
print(next(g))
g.send(42)
# auto-teste
import doctest
doctest.testmod()

30
control/guido/guido1b.py Normal file
View File

@ -0,0 +1,30 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal(ger2())
OK
None
Visualização no PythonTutor: http://goo.gl/61CUcA
"""
def ger1():
val = yield 'OK'
print(val)
yield # para evitar o StopIteration
def ger2():
for i in ger1():
yield i
def principal(g):
print(next(g))
g.send(42)
# auto-teste
import doctest
doctest.testmod()

31
control/guido/guido2.py Normal file
View File

@ -0,0 +1,31 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal_susto(ger1())
OK
Bu!
Visualização no PythonTutor: http://goo.gl/m6p2Bc
"""
def ger1():
try:
val = yield 'OK'
except RuntimeError as exc:
print(exc)
else:
print(val)
yield # para evitar o StopIteration
def principal_susto(g):
print(next(g))
g.throw(RuntimeError('Bu!'))
# auto-teste
import doctest
doctest.testmod()

35
control/guido/guido3.py Normal file
View File

@ -0,0 +1,35 @@
"""
Exemplo adaptado da mensagem do Guido van Rossum em:
https://groups.google.com/forum/#!msg/python-tulip/bmphRrryuFk/aB45sEJUomYJ
http://bit.ly/yieldfrom
>>> principal_susto(ger2())
OK
Bu!
Visualização no PythonTutor: http://goo.gl/QXzQHS
"""
def ger1():
try:
val = yield 'OK'
except RuntimeError as exc:
print(exc)
else:
print(val)
yield # para evitar o StopIteration
def ger2():
yield from ger1()
def principal_susto(g):
print(next(g))
g.throw(RuntimeError('Bu!'))
# auto-teste
import doctest
doctest.testmod()

20
control/http_cli0.py Normal file
View File

@ -0,0 +1,20 @@
# adaptado de:
# https://github.com/feihong/tulip-talk/blob/master/examples/2-tulip-download.py
import asyncio
import aiohttp
@asyncio.coroutine
def download(url):
response = yield from aiohttp.request('GET', url)
for k, v in response.items():
print('{}: {}'.format(k, v[:80]))
data = yield from response.read()
print('\nReceived {} bytes.\n'.format(len(data)))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
url = 'https://www.cia.gov/library/publications/the-world-factbook/geos/br.html'
coroutine = download(url)
loop.run_until_complete(coroutine)

90
control/mirror.py Normal file
View File

@ -0,0 +1,90 @@
"""
A "mirroring" ``stdout`` context.
While active, the context manager reverses text output to
``stdout``::
# BEGIN MIRROR_DEMO_1
>>> from mirror import LookingGlass
>>> with LookingGlass() as what: # <1>
... print('Alice, Kitty and Snowdrop') # <2>
... print(what)
...
pordwonS dna yttiK ,ecilA # <3>
YKCOWREBBAJ
>>> what # <4>
'JABBERWOCKY'
# END MIRROR_DEMO_1
This exposes the context manager operation::
# BEGIN MIRROR_DEMO_2
>>> from mirror import LookingGlass
>>> manager = LookingGlass() # <1>
>>> manager
<mirror.LookingGlass object at 0x2a578ac>
>>> monster = manager.__enter__() # <2>
>>> monster == 'JABBERWOCKY' # <3>
eurT
>>> monster
'YKCOWREBBAJ'
>>> manager
>ca875a2x0 ta tcejbo ssalGgnikooL.rorrim<
>>> manager.__exit__(None, None, None) # <4>
>>> monster
'JABBERWOCKY'
# END MIRROR_DEMO_2
The context manager can handle and "swallow" exceptions.
# BEGIN MIRROR_DEMO_3
>>> from mirror import LookingGlass
>>> with LookingGlass():
... print('Humpty Dumpty')
... x = 1/0 # <1>
... print('END') # <2>
...
ytpmuD ytpmuH
Please DO NOT divide by zero!
>>> with LookingGlass():
... print('Humpty Dumpty')
... x = no_such_name # <1>
... print('END') # <2>
...
Traceback (most recent call last):
...
NameError: name 'no_such_name' is not defined
# END MIRROR_DEMO_3
"""
# BEGIN MIRROR_EX
class LookingGlass:
def __enter__(self): # <1>
import sys
self.original_write = sys.stdout.write # <2>
sys.stdout.write = self.reverse_write # <3>
return 'JABBERWOCKY' # <4>
def reverse_write(self, text): # <5>
self.original_write(text[::-1])
def __exit__(self, exc_type, exc_value, traceback): # <6>
import sys # <7>
sys.stdout.write = self.original_write # <8>
if exc_type is ZeroDivisionError: # <9>
print('Please DO NOT divide by zero!')
return True # <10>
# <11>
# END MIRROR_EX

64
control/mirror_gen.py Normal file
View File

@ -0,0 +1,64 @@
"""
A "mirroring" ``stdout`` context manager.
While active, the context manager reverses text output to
``stdout``::
# BEGIN MIRROR_GEN_DEMO_1
>>> from mirror_gen import looking_glass
>>> with looking_glass() as what: # <1>
... print('Alice, Kitty and Snowdrop')
... print(what)
...
pordwonS dna yttiK ,ecilA
YKCOWREBBAJ
>>> what
'JABBERWOCKY'
# END MIRROR_GEN_DEMO_1
This exposes the context manager operation::
# BEGIN MIRROR_GEN_DEMO_2
>>> from mirror_gen import looking_glass
>>> manager = looking_glass() # <1>
>>> manager # doctest: +ELLIPSIS
<contextlib._GeneratorContextManager object at 0x...>
>>> monster = manager.__enter__() # <2>
>>> monster == 'JABBERWOCKY' # <3>
eurT
>>> monster
'YKCOWREBBAJ'
>>> manager # doctest: +ELLIPSIS
>...x0 ta tcejbo reganaMtxetnoCrotareneG_.biltxetnoc<
>>> manager.__exit__(None, None, None) # <4>
>>> monster
'JABBERWOCKY'
# END MIRROR_GEN_DEMO_2
"""
# BEGIN MIRROR_GEN_EX
import contextlib
@contextlib.contextmanager # <1>
def looking_glass():
import sys
original_write = sys.stdout.write # <2>
def reverse_write(text): # <3>
original_write(text[::-1])
sys.stdout.write = reverse_write # <4>
yield 'JABBERWOCKY' # <5>
sys.stdout.write = original_write # <6>
# END MIRROR_GEN_EX

96
control/mirror_gen_exc.py Normal file
View File

@ -0,0 +1,96 @@
"""
A "mirroring" ``stdout`` context manager.
While active, the context manager reverses text output to
``stdout``::
# BEGIN MIRROR_GEN_DEMO_1
>>> from mirror_gen import looking_glass
>>> with looking_glass() as what: # <1>
... print('Alice, Kitty and Snowdrop')
... print(what)
...
pordwonS dna yttiK ,ecilA
YKCOWREBBAJ
>>> what
'JABBERWOCKY'
# END MIRROR_GEN_DEMO_1
This exposes the context manager operation::
# BEGIN MIRROR_GEN_DEMO_2
>>> from mirror_gen import looking_glass
>>> manager = looking_glass() # <1>
>>> manager # doctest: +ELLIPSIS
<contextlib._GeneratorContextManager object at 0x...>
>>> monster = manager.__enter__() # <2>
>>> monster == 'JABBERWOCKY' # <3>
eurT
>>> monster
'YKCOWREBBAJ'
>>> manager # doctest: +ELLIPSIS
>...x0 ta tcejbo reganaMtxetnoCrotareneG_.biltxetnoc<
>>> manager.__exit__(None, None, None) # <4>
>>> monster
'JABBERWOCKY'
# END MIRROR_GEN_DEMO_2
The context manager can handle and "swallow" exceptions.
# BEGIN MIRROR_GEN_DEMO_3
>>> from mirror_gen import looking_glass
>>> with looking_glass():
... print('Humpty Dumpty')
... x = 1/0 # <1>
... print('END') # <2>
...
ytpmuD ytpmuH
Please DO NOT divide by zero!
>>> with looking_glass():
... print('Humpty Dumpty')
... x = no_such_name # <1>
... print('END') # <2>
...
Traceback (most recent call last):
...
NameError: name 'no_such_name' is not defined
# END MIRROR_GEN_DEMO_3
"""
# BEGIN MIRROR_GEN_EX
import contextlib
@contextlib.contextmanager # <1>
def looking_glass():
import sys
original_write = sys.stdout.write # <2>
def reverse_write(text): # <3>
original_write(text[::-1])
sys.stdout.write = reverse_write # <4>
msg = ''
try:
yield 'JABBERWOCKY' # <5>
except ZeroDivisionError: # <6>
msg = 'Please DO NOT divide by zero!' # <7>
except:
raise # <8>
finally:
sys.stdout.write = original_write # <9>
if msg:
print(msg) # <10>
# END MIRROR_GEN_EX

12
control/referencias.txt Normal file
View File

@ -0,0 +1,12 @@
What's New in Python 2.5 - PEP 342: New Generator Features
http://docs.python.org/release/2.5/whatsnew/pep-342.html
PEP 342 -- Coroutines via Enhanced Generators
http://www.python.org/dev/peps/pep-0342/
PEP 380 -- Syntax for Delegating to a Subgenerator
http://www.python.org/dev/peps/pep-0380/
Coroutines For the Working Python Developer
http://sdiehl.github.io/coroutine-tutorial/