renamed 22-async/

This commit is contained in:
Luciano Ramalho
2021-03-11 16:11:39 -03:00
parent 7c88948b22
commit e1cd63aa04
15 changed files with 8 additions and 8 deletions

View File

@@ -0,0 +1,29 @@
#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4 # <1>
async def probe(domain: str) -> tuple[str, bool]: # <2>
loop = asyncio.get_running_loop() # <3>
try:
await loop.getaddrinfo(domain, None) # <4>
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None: # <5>
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # <6>
domains = (f'{name}.dev'.lower() for name in names) # <7>
coros = [probe(domain) for domain in domains] # <8>
for coro in asyncio.as_completed(coros): # <9>
domain, found = await coro # <10>
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
asyncio.run(main()) # <11>

View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
import asyncio
import sys
from keyword import kwlist
from domainlib import multi_probe
async def main(tld: str) -> None:
tld = tld.strip('.')
names = (kw for kw in kwlist if len(kw) <= 4) # <1>
domains = (f'{name}.{tld}'.lower() for name in names) # <2>
print('FOUND\t\tNOT FOUND') # <3>
print('=====\t\t=========')
async for domain, found in multi_probe(domains): # <4>
indent = '' if found else '\t\t' # <5>
print(f'{indent}{domain}')
if __name__ == '__main__':
if len(sys.argv) == 2:
asyncio.run(main(sys.argv[1])) # <6>
else:
print('Please provide a TLD.', f'Example: {sys.argv[0]} COM.BR')

View File

@@ -0,0 +1,30 @@
import asyncio
import socket
from collections.abc import Iterable, AsyncIterator
from typing import NamedTuple, Optional
class Result(NamedTuple): # <1>
domain: str
found: bool
OptionalLoop = Optional[asyncio.AbstractEventLoop] # <2>
async def probe(domain: str, loop: OptionalLoop = None) -> Result: # <3>
if loop is None:
loop = asyncio.get_running_loop()
try:
await loop.getaddrinfo(domain, None)
except socket.gaierror:
return Result(domain, False)
return Result(domain, True)
async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]: # <4>
loop = asyncio.get_running_loop()
coros = [probe(domain, loop) for domain in domains] # <5>
for coro in asyncio.as_completed(coros): # <6>
result = await coro # <7>
yield result # <8>

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python3
from curio import run, TaskGroup
from curio.socket import getaddrinfo, gaierror
from keyword import kwlist
MAX_KEYWORD_LEN = 4 # <1>
async def probe(domain: str) -> tuple[str, bool]: # <2>
try:
await getaddrinfo(domain, None) # <4>
except gaierror:
return (domain, False)
return (domain, True)
async def main() -> None: # <5>
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) # <6>
domains = (f'{name}.dev'.lower() for name in names) # <7>
async with TaskGroup() as group:
for domain in domains:
await group.spawn(probe, domain)
async for task in group: # <9>
domain, found = task.result # <10>
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
run(main()) # <11>

View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
import curio
import sys
from keyword import kwlist
from domainlib import multi_probe
async def main(tld: str) -> None:
tld = tld.strip('.')
names = (kw for kw in kwlist if len(kw) <= 4)
domains = (f'{name}.{tld}'.lower() for name in names)
print('FOUND\t\tNOT FOUND')
print('=====\t\t=========')
async for domain, found in multi_probe(domains):
indent = '' if found else '\t\t'
print(f'{indent}{domain}')
if __name__ == '__main__':
if len(sys.argv) == 2:
curio.run(main(sys.argv[1]))
else:
print('Please provide a TLD.', f'Example: {sys.argv[0]} COM.BR')

View File

@@ -0,0 +1,25 @@
from curio import TaskGroup
from curio.socket import getaddrinfo, gaierror
from collections.abc import Iterable, AsyncIterator
from typing import NamedTuple
class Result(NamedTuple):
domain: str
found: bool
async def probe(domain: str) -> Result:
try:
await getaddrinfo(domain, None)
except gaierror:
return Result(domain, False)
return Result(domain, True)
async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]:
async with TaskGroup() as group:
for domain in domains:
await group.spawn(probe, domain)
async for task in group:
yield task.result

View File

@@ -0,0 +1 @@
curio==1.5

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python
"""
Class ``InvertedIndex`` builds an inverted index mapping each word to
the set of Unicode characters which contain that word in their names.
Optional arguments to the constructor are ``first`` and ``last+1`` character
codes to index, to make testing easier.
In the example below, only the ASCII range was indexed::
>>> idx = InvertedIndex(32, 128)
>>> sorted(idx.entries['SIGN'])
['#', '$', '%', '+', '<', '=', '>']
>>> sorted(idx.entries['DIGIT'])
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>>> idx.entries['DIGIT'] & idx.entries['EIGHT']
{'8'}
>>> idx.search('digit')
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>>> idx.search('eight digit')
['8']
>>> idx.search('a letter')
['A', 'a']
>>> idx.search('a letter capital')
['A']
>>> idx.search('borogove')
[]
"""
import sys
import unicodedata
from collections import defaultdict
from collections.abc import Iterator
STOP_CODE: int = sys.maxunicode + 1
Char = str
Index = defaultdict[str, set[Char]]
def tokenize(text: str) -> Iterator[str]:
"""return iterator of uppercased words"""
for word in text.upper().replace('-', ' ').split():
yield word
class InvertedIndex:
entries: Index
def __init__(self, start: int = 32, stop: int = STOP_CODE):
entries: Index = defaultdict(set)
for char in (chr(i) for i in range(start, stop)):
name = unicodedata.name(char, '')
if name:
for word in tokenize(name):
entries[word].add(char)
self.entries = entries
def search(self, query: str) -> list[Char]:
if words := list(tokenize(query)):
first = self.entries[words[0]]
result = first.intersection(*(self.entries[w] for w in words[1:]))
return sorted(result)
else:
return []
def format_results(chars: list[Char]) -> Iterator[str]:
for char in chars:
name = unicodedata.name(char)
code = ord(char)
yield f'U+{code:04X}\t{char}\t{name}'
def main(words: list[str]) -> None:
if not words:
print('Please give one or more words to search.')
sys.exit()
index = InvertedIndex()
chars = index.search(' '.join(words))
for line in format_results(chars):
print(line)
print('' * 66, f'{len(chars)} found')
if __name__ == '__main__':
main(sys.argv[1:])

View File

@@ -0,0 +1,6 @@
click==7.1.2
fastapi==0.63.0
h11==0.12.0
pydantic==1.7.3
starlette==0.13.6
uvicorn==0.13.4

View File

@@ -0,0 +1,83 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Mojifinder</title>
<style>
body {font-family: "Lucida Sans Unicode", "Lucida Grande", sans-serif;}
table {font-family: "Lucida Console", "Monaco", monospace;
text-align: left; min-width: 300px}
td.code {min-width: 40px; text-align: right;}
td.char {min-width: 50px; text-align: center;}
caption {background: lightgray; }
</style>
<script>
"use strict";
function appendCell(row, text, class_) {
let cell = document.createElement('td');
cell.appendChild(document.createTextNode(text));
if (class_ !== undefined) {
cell.setAttribute('class', class_);
}
row.appendChild(cell);
}
function fillTable(results) {
const table = document.querySelector('table');
while (table.lastElementChild.tagName === 'TR') {
table.removeChild(table.lastElementChild);
}
let count = 0;
results.forEach((item) => {
let row = document.createElement('tr');
let code = item.char.codePointAt(0);
let uCode = 'U+' + code.toString(16).toUpperCase().padStart(4, '0');
appendCell(row, uCode, 'code');
appendCell(row, item.char, 'char');
appendCell(row, item.name);
table.appendChild(row);
count++;
});
let plural = "s";
if (count===1) plural = "";
let msg = `${count} character${plural} found`;
document.querySelector('caption').textContent = msg;
}
async function fetchResults(query) {
let url = location.href.replace(location.search, '');
const response = await fetch(`${url}search?q=${query}`);
if (response.ok) {
return response.json();
} else {
throw new Error(`HTTP error! status: ${response.status}`);
}
}
function updateTable(event) {
const input = document.getElementById('query');
fetchResults(input.value)
.then(fillTable)
.catch(error => console.log(error));
}
window.addEventListener('DOMContentLoaded', (event) => {
const input = document.getElementById('query');
input.addEventListener('change', updateTable);
});
</script>
</head>
<body>
<div>
<input id="query" type="search" name="q" value="">
<button onClick="updateTable()">Search</button>
</div>
<table>
<caption></caption>
</table>
</body>
</html>

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
# tag::TCP_MOJIFINDER_TOP[]
import sys
import asyncio
import functools
from charindex import InvertedIndex, format_results # <1>
CRLF = b'\r\n'
PROMPT = b'?> '
async def finder(index: InvertedIndex, # <2>
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
client = writer.get_extra_info('peername') # <3>
while True: # <4>
writer.write(PROMPT) # can't await! # <5>
await writer.drain() # must await! # <6>
data = await reader.readline() # <7>
try:
query = data.decode().strip() # <8>
except UnicodeDecodeError: # <9>
query = '\x00'
print(f' From {client}: {query!r}') # <10>
if query:
if ord(query[:1]) < 32: # <11>
break
results = await search(query, index, writer) # <12>
print(f' To {client}: {results} results.') # <13>
writer.close() # <14>
await writer.wait_closed() # <15>
print(f'Close {client}.') # <16>
# end::TCP_MOJIFINDER_TOP[]
# tag::TCP_MOJIFINDER_SEARCH[]
async def search(query: str, # <1>
index: InvertedIndex,
writer: asyncio.StreamWriter) -> int:
chars = index.search(query) # <2>
lines = (line.encode() + CRLF for line # <3>
in format_results(chars))
writer.writelines(lines) # <4>
await writer.drain() # <5>
status_line = f'{"" * 66} {len(chars)} found' # <6>
writer.write(status_line.encode() + CRLF)
await writer.drain()
return len(chars)
# end::TCP_MOJIFINDER_SEARCH[]
# tag::TCP_MOJIFINDER_MAIN[]
async def supervisor(index: InvertedIndex, host: str, port: int):
server = await asyncio.start_server( # <1>
functools.partial(finder, index), # <2>
host, port) # <3>
addr = server.sockets[0].getsockname() # type: ignore # <4>
print(f'Serving on {addr}. Hit CTRL-C to stop.')
await server.serve_forever() # <5>
def main(host: str = '127.0.0.1', port_arg: str = '2323'):
port = int(port_arg)
print('Building index.')
index = InvertedIndex() # <6>
try:
asyncio.run(supervisor(index, host, port)) # <7>
except KeyboardInterrupt: # <8>
print('\nServer shut down.')
if __name__ == '__main__':
main(*sys.argv[1:])
# end::TCP_MOJIFINDER_MAIN[]

View File

@@ -0,0 +1,36 @@
import pathlib
from unicodedata import name
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from charindex import InvertedIndex
app = FastAPI( # <1>
title='Mojifinder Web',
description='Search for Unicode characters by name.',
)
class CharName(BaseModel): # <2>
char: str
name: str
def init(app): # <3>
app.state.index = InvertedIndex()
static = pathlib.Path(__file__).parent.absolute() / 'static'
with open(static / 'form.html') as fp:
app.state.form = fp.read()
init(app) # <4>
@app.get('/search', response_model=list[CharName]) # <5>
async def search(q: str): # <6>
chars = app.state.index.search(q)
return ({'char': c, 'name': name(c)} for c in chars) # <7>
@app.get('/', # <8>
response_class=HTMLResponse,
include_in_schema=False)
def form():
return app.state.form

View File

@@ -0,0 +1,37 @@
#!/usr/bin/env python3
import json
import unicodedata
from bottle import route, request, run, static_file
from charindex import InvertedIndex
index = {}
@route('/')
def form():
return static_file('form.html', root = 'static/')
@route('/search')
def search():
query = request.query['q']
chars = index.search(query)
results = []
for char in chars:
name = unicodedata.name(char)
results.append({'char': char, 'name': name})
return json.dumps(results).encode('UTF-8')
def main(port):
global index
index = InvertedIndex()
host = 'localhost'
run(host='localhost', port=port, debug=True)
if __name__ == '__main__':
main(8000)