Initial commit

This commit is contained in:
David Beazley
2023-07-16 20:21:00 -05:00
parent 82e815fab2
commit 7d4b30154a
259 changed files with 600233 additions and 2 deletions

15
Solutions/1_1/art.py Normal file
View File

@@ -0,0 +1,15 @@
# art.py
import sys
import random
chars = '\|/'
def draw(rows, columns):
for r in range(rows):
print(''.join(random.choice(chars) for _ in range(columns)))
if __name__ == '__main__':
if len(sys.argv) != 3:
raise SystemExit("Usage: art.py rows columns")
draw(int(sys.argv[1]), int(sys.argv[2]))

12
Solutions/1_3/pcost.py Normal file
View File

@@ -0,0 +1,12 @@
# pcost.py
total_cost = 0.0
with open('../../Data/portfolio.dat', 'r') as f:
for line in f:
fields = line.split()
nshares = int(fields[1])
price = float(fields[2])
total_cost = total_cost + nshares * price
print(total_cost)

20
Solutions/1_4/pcost.py Normal file
View File

@@ -0,0 +1,20 @@
# pcost.py
def portfolio_cost(filename):
total_cost = 0.0
with open(filename) as f:
for line in f:
fields = line.split()
try:
nshares = int(fields[1])
price = float(fields[2])
total_cost = total_cost + nshares*price
# This catches errors in int() and float() conversions above
except ValueError as e:
print("Couldn't parse:", repr(line))
print("Reason:", e)
return total_cost
print(portfolio_cost('../../Data/portfolio3.dat'))

9
Solutions/1_5/stock.py Normal file
View File

@@ -0,0 +1,9 @@
# stock.py
class Stock:
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def cost(self):
return self.shares * self.price

22
Solutions/1_6/pcost.py Normal file
View File

@@ -0,0 +1,22 @@
# pcost.py
def portfolio_cost(filename):
total_cost = 0.0
with open(filename) as f:
for line in f:
fields = line.split()
try:
nshares = int(fields[1])
price = float(fields[2])
total_cost = total_cost + nshares * price
# This catches errors in int() and float() conversions above
except ValueError as e:
print("Couldn't parse:", line)
print("Reason:", e)
return total_cost
if __name__ == '__main__':
print(portfolio_cost('../../Data/portfolio.dat'))

View File

@@ -0,0 +1,80 @@
# readrides.py
import csv
def read_rides_as_tuples(filename):
'''
Read the bus ride data as a list of tuples
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = (route, date, daytype, rides)
records.append(record)
return records
def read_rides_as_dicts(filename):
'''
Read the bus ride data as a list of dicts
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = {
'route': route,
'date': date,
'daytype': daytype,
'rides' : rides
}
records.append(record)
return records
class Row:
# Uncomment to see effect of slots
# __slots__ = ('route', 'date', 'daytype', 'rides')
def __init__(self, route, date, daytype, rides):
self.route = route
self.date = date
self.daytype = daytype
self.rides = rides
# Uncomment to use a namedtuple instead
#from collections import namedtuple
#Row = namedtuple('Row',('route','date','daytype','rides'))
def read_rides_as_instances(filename):
'''
Read the bus ride data as a list of instances
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = Row(route, date, daytype, rides)
records.append(record)
return records
if __name__ == '__main__':
import tracemalloc
tracemalloc.start()
read_rides = read_rides_as_tuples # Change to as_dicts, as_instances, etc.
rides = read_rides("../../Data/ctabus.csv")
print('Memory Use: Current %d, Peak %d' % tracemalloc.get_traced_memory())

55
Solutions/2_2/cta.py Normal file
View File

@@ -0,0 +1,55 @@
# cta.py
from collections import defaultdict, Counter
import tracemalloc
import readrides
tracemalloc.start()
rows = readrides.read_rides_as_dicts('../../Data/ctabus.csv')
# --------------------------------------------------
# Question 1: How many bus routes are in Chicago?
# Solution: Use a set to get unique values.
routes = set()
for row in rows:
routes.add(row['route'])
print(len(routes), 'routes')
# --------------------------------------------------
# Question 2: How many people rode route 22 on February 2, 2011?
# Solution: Make dictionary with composite keys
by_route_date = { }
for row in rows:
by_route_date[row['route'], row['date']] = row['rides']
print('Rides on Route 22, February 2, 2011:', by_route_date['22','02/02/2011'])
# --------------------------------------------------
# Question 3: Total number of rides per route
# Solution: Use a counter to tabulate things
rides_per_route = Counter()
for row in rows:
rides_per_route[row['route']] += row['rides']
# Make a table showing the routes and a count ranked by popularity
for route, count in rides_per_route.most_common():
print('%5s %10d' % (route, count))
# --------------------------------------------------
# Question 4: Routes with greatest increase in ridership 2001 - 2011
# Solution: Counters embedded inside a defaultdict
rides_by_year = defaultdict(Counter)
for row in rows:
year = row['date'].split('/')[2]
rides_by_year[year][row['route']] += row['rides']
diffs = rides_by_year['2011'] - rides_by_year['2001']
for route, diff in diffs.most_common(5):
print(route, diff)
# ---- Memory use
print('Memory Use: Current %d, Peak %d' % tracemalloc.get_traced_memory())

18
Solutions/2_2/readport.py Normal file
View File

@@ -0,0 +1,18 @@
# readport.py
import csv
# A function that reads a file into a list of dicts
def read_portfolio(filename):
portfolio = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = {
'name' : row[0],
'shares' : int(row[1]),
'price' : float(row[2])
}
portfolio.append(record)
return portfolio

View File

@@ -0,0 +1,80 @@
# readrides.py
import csv
def read_rides_as_tuples(filename):
'''
Read the bus ride data as a list of tuples
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = (route, date, daytype, rides)
records.append(record)
return records
def read_rides_as_dicts(filename):
'''
Read the bus ride data as a list of dicts
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = {
'route': route,
'date': date,
'daytype': daytype,
'rides' : rides
}
records.append(record)
return records
class Row:
# Uncomment to see effect of slots
# __slots__ = ('route', 'date', 'daytype', 'rides')
def __init__(self, route, date, daytype, rides):
self.route = route
self.date = date
self.daytype = daytype
self.rides = rides
# Uncomment to use a namedtuple instead
#from collections import namedtuple
#Row = namedtuple('Row',('route','date','daytype','rides'))
def read_rides_as_instances(filename):
'''
Read the bus ride data as a list of instances
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = Row(route, date, daytype, rides)
records.append(record)
return records
if __name__ == '__main__':
import tracemalloc
tracemalloc.start()
read_rides = read_rides_as_tuples # Change to as_dicts, as_instances, etc.
rides = read_rides("../../Data/ctabus.csv")
print('Memory Use: Current %d, Peak %d' % tracemalloc.get_traced_memory())

74
Solutions/2_4/mutint.py Normal file
View File

@@ -0,0 +1,74 @@
# mutint.py
#
# Mutable integers
from functools import total_ordering
@total_ordering
class MutInt:
__slots__ = ['value']
def __init__(self, value):
self.value = value
def __str__(self):
return str(self.value)
def __repr__(self):
return f'MutInt({self.value!r})'
def __format__(self, fmt):
return format(self.value, fmt)
# Implement the "+" operator. Forward operands (MutInt + other)
def __add__(self, other):
if isinstance(other, MutInt):
return MutInt(self.value + other.value)
elif isinstance(other, int):
return MutInt(self.value + other)
else:
return NotImplemented
# Support for reversed operands (other + MutInt)
__radd__ = __add__
# Support for in-place update (MutInt += other)
def __iadd__(self, other):
if isinstance(other, MutInt):
self.value += other.value
return self
elif isinstance(other, int):
self.value += other
return self
else:
return NotImplemented
# Support for equality testing
def __eq__(self, other):
if isinstance(other, MutInt):
return self.value == other.value
elif isinstance(other, int):
return self.value == other
else:
return NotImplemented
# One relation is needed for @total_ordering decorator. It fills in others
def __lt__(self, other):
if isinstance(other, MutInt):
return self.value < other.value
elif isinstance(other, int):
return self.value < other
else:
return NotImplemented
# Conversions to int() and float()
def __int__(self):
return int(self.value)
def __float__(self):
return float(self.value)
# Support for indexing s[MutInt]
__index__ = __int__

55
Solutions/2_5/cta.py Normal file
View File

@@ -0,0 +1,55 @@
# cta.py
from collections import defaultdict, Counter
import tracemalloc
import readrides
tracemalloc.start()
rows = readrides.read_rides_as_dicts('../../Data/ctabus.csv')
# --------------------------------------------------
# Question 1: How many bus routes are in Chicago?
# Solution: Use a set to get unique values.
routes = set()
for row in rows:
routes.add(row['route'])
print(len(routes), 'routes')
# --------------------------------------------------
# Question 2: How many people rode route 22 on February 2, 2011?
# Solution: Make dictionary with composite keys
by_route_date = { }
for row in rows:
by_route_date[row['route'], row['date']] = row['rides']
print('Rides on Route 22, February 2, 2011:', by_route_date['22','02/02/2011'])
# --------------------------------------------------
# Question 3: Total number of rides per route
# Solution: Use a counter to tabulate things
rides_per_route = Counter()
for row in rows:
rides_per_route[row['route']] += row['rides']
# Make a table showing the routes and a count ranked by popularity
for route, count in rides_per_route.most_common():
print('%5s %10d' % (route, count))
# --------------------------------------------------
# Question 4: Routes with greatest increase in ridership 2001 - 2011
# Solution: Counters embedded inside a defaultdict
rides_by_year = defaultdict(Counter)
for row in rows:
year = row['date'].split('/')[2]
rides_by_year[year][row['route']] += row['rides']
diffs = rides_by_year['2011'] - rides_by_year['2001']
for route, diff in diffs.most_common(5):
print(route, diff)
# ---- Memory use
print('Memory Use: Current %d, Peak %d' % tracemalloc.get_traced_memory())

144
Solutions/2_5/readrides.py Normal file
View File

@@ -0,0 +1,144 @@
# readrides.py
import csv
def read_rides_as_tuples(filename):
'''
Read the bus ride data as a list of tuples
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = (route, date, daytype, rides)
records.append(record)
return records
def read_rides_as_dicts(filename):
'''
Read the bus ride data as a list of dicts
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = {
'route': route,
'date': date,
'daytype': daytype,
'rides' : rides
}
records.append(record)
return records
class Row:
__slots__ = ('route', 'date', 'daytype', 'rides')
def __init__(self, route, date, daytype, rides):
self.route = route
self.date = date
self.daytype = daytype
self.rides = rides
def read_rides_as_instances(filename):
'''
Read the bus ride data as a list of instances
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = Row(route, date, daytype, rides)
records.append(record)
return records
# Read as columns
def read_rides_as_columns(filename):
'''
Read the bus ride data into 4 lists, representing columns
'''
routes = []
dates = []
daytypes = []
numrides = []
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
routes.append(row[0])
dates.append(row[1])
daytypes.append(row[2])
numrides.append(int(row[3]))
return dict(routes=routes, dates=dates, daytypes=daytypes, numrides=numrides)
# The great "fake"
import collections
class RideData(collections.Sequence):
def __init__(self):
# Each value is a list with all of the values (a column)
self.routes = []
self.dates = []
self.daytypes = []
self.numrides = []
def __len__(self):
# All lists assumed to have the same length
return len(self.routes)
def append(self, d):
self.routes.append(d['route'])
self.dates.append(d['date'])
self.daytypes.append(d['daytype'])
self.numrides.append(d['rides'])
def __getitem__(self, index):
return { 'route': self.routes[index],
'date': self.dates[index],
'daytype': self.daytypes[index],
'rides': self.numrides[index] }
# Modified version using RideData
def read_rides_as_dicts(filename):
'''
Read the bus ride data as a list of dicts
'''
records = RideData()
with open(filename) as f:
rows = csv.reader(f)
headings = next(rows) # Skip headers
for row in rows:
route = row[0]
date = row[1]
daytype = row[2]
rides = int(row[3])
record = {
'route': route,
'date': date,
'daytype': daytype,
'rides' : rides
}
records.append(record)
return records
if __name__ == '__main__':
import tracemalloc
tracemalloc.start()
read_rides = read_rides_as_dicts # Change to as_dicts, as_instances, etc.
rides = read_rides("../../Data/ctabus.csv")
print('Memory Use: Current %d, Peak %d' % tracemalloc.get_traced_memory())

View File

@@ -0,0 +1,37 @@
# colreader.py
import collections
import csv
class DataCollection(collections.Sequence):
def __init__(self, columns):
self.column_names = list(columns)
self.column_data = list(columns.values())
def __len__(self):
return len(self.column_data[0])
def __getitem__(self, index):
return dict(zip(self.column_names,
(col[index] for col in self.column_data)))
def read_csv_as_columns(filename, types):
columns = collections.defaultdict(list)
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
for name, func, val in zip(headers, types, row):
columns[name].append(func(val))
return DataCollection(columns)
if __name__ == '__main__':
import tracemalloc
from sys import intern
tracemalloc.start()
data = read_csv_as_columns('../../Data/ctabus.csv', [intern, intern, intern, int])
print(tracemalloc.get_traced_memory())

55
Solutions/2_6/cta.py Normal file
View File

@@ -0,0 +1,55 @@
# cta.py
from collections import defaultdict, Counter
import tracemalloc
import csv
import sys
tracemalloc.start()
if True:
# Part (b)
import reader
rows = reader.read_csv_as_dicts('../../Data/ctabus.csv',
[sys.intern, sys.intern, sys.intern, int])
else:
# Part (d) - Challenge
import colreader
rows = colreader.read_csv_as_columns('../../Data/ctabus.csv',
[sys.intern, sys.intern, sys.intern, int])
# --------------------------------------------------
# Question 1: How many bus routes are in Chicago?
# Solution: Use a set to get unique values.
routes = set()
for row in rows:
routes.add(row['route'])
print(len(routes), 'routes')
# --------------------------------------------------
# Question 2: Total number of rides per route
# Solution: Use a counter to tabulate things
rides_per_route = Counter()
for row in rows:
rides_per_route[row['route']] += row['rides']
# Make a table showing the routes and a count ranked by popularity
for route, count in rides_per_route.most_common():
print('%5s %10d' % (route, count))
# --------------------------------------------------
# Question 3: Routes with greatest increase in ridership 2001 - 2011
# Solution: Counters embedded inside a defaultdict
rides_by_year = defaultdict(Counter)
for row in rows:
year = row['date'].split('/')[2]
rides_by_year[year][row['route']] += row['rides']
diffs = rides_by_year['2011'] - rides_by_year['2001']
for route, diff in diffs.most_common(5):
print(route, diff)
# ---- Memory use
print('Memory Use: Current %d, Peak %d' % tracemalloc.get_traced_memory())

22
Solutions/2_6/reader.py Normal file
View File

@@ -0,0 +1,22 @@
# reader.py
import csv
def read_csv_as_dicts(filename, types):
'''
Read a CSV file into a list of dicts with column type conversion
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = { name: func(val) for name, func, val in zip(headers, types, row) }
records.append(record)
return records

41
Solutions/3_1/stock.py Normal file
View File

@@ -0,0 +1,41 @@
# stock.py
class Stock:
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
def read_portfolio(filename):
'''
Read a CSV file of stock data into a list of Stocks
'''
import csv
portfolio = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = Stock(row[0], int(row[1]), float(row[2]))
portfolio.append(record)
return portfolio
def print_portfolio(portfolio):
'''
Make a nicely formatted table showing stock data
'''
print('%10s %10s %10s' % ('name', 'shares', 'price'))
print(('-'*10 + ' ')*3)
for s in portfolio:
print('%10s %10d %10.2f' % (s.name, s.shares, s.price))
if __name__ == '__main__':
portfolio = read_portfolio('../../Data/portfolio.csv')
print_portfolio(portfolio)

33
Solutions/3_2/stock.py Normal file
View File

@@ -0,0 +1,33 @@
# stock.py
class Stock:
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
def read_portfolio(filename):
'''
Read a CSV file of stock data into a list of Stocks
'''
import csv
portfolio = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = Stock(row[0], int(row[1]), float(row[2]))
portfolio.append(record)
return portfolio
if __name__ == '__main__':
import tableformat
portfolio = read_portfolio('../../Data/portfolio.csv')
tableformat.print_table(portfolio, ['name','shares','price'])

View File

@@ -0,0 +1,8 @@
# tableformat.py
# Print a table
def print_table(records, fields):
print(' '.join('%10s' % fieldname for fieldname in fields))
print(('-'*10 + ' ')*len(fields))
for record in records:
print(' '.join('%10s' % getattr(record, fieldname) for fieldname in fields))

28
Solutions/3_3/reader.py Normal file
View File

@@ -0,0 +1,28 @@
# reader.py
import csv
def read_csv_as_dicts(filename, types):
'''
Read a CSV file into a list of dicts with column type conversion
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = { name: func(val) for name, func, val in zip(headers, types, row) }
records.append(record)
return records
def read_csv_as_instances(filename, cls):
'''
Read a CSV file into a list of instances
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
records.append(cls.from_row(row))
return records

40
Solutions/3_3/stock.py Normal file
View File

@@ -0,0 +1,40 @@
# stock.py
class Stock:
types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls.types, row)]
return cls(*values)
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
def read_portfolio(filename):
'''
Read a CSV file of stock data into a list of Stocks
'''
import csv
portfolio = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = Stock.from_row(row)
portfolio.append(record)
return portfolio
if __name__ == '__main__':
import tableformat
import reader
# portfolio = read_portfolio('../../Data/portfolio.csv')
portfolio = reader.read_csv_as_instances('../../Data/portfolio.csv', Stock)
tableformat.print_table(portfolio, ['name', 'shares', 'price'])

View File

@@ -0,0 +1,17 @@
# tableformat.py
# Print a table
def print_table(records, fields):
# Print the table headers in a 10-character wide field
for fieldname in fields:
print('%10s' % fieldname, end=' ')
print()
# Print the separator bars
print(('-'*10 + ' ')*len(fields))
# Output the table contents
for r in records:
for fieldname in fields:
print('%10s' % getattr(r, fieldname), end=' ')
print()

43
Solutions/3_4/stock.py Normal file
View File

@@ -0,0 +1,43 @@
# stock.py
class Stock:
__slots__ = ('name','_shares','_price')
_types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls._types, row)]
return cls(*values)
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
if not isinstance(value, self._types[1]):
raise TypeError(f'Expected {self._types[1].__name__}')
if value < 0:
raise ValueError('shares must be >= 0')
self._shares = value
@property
def price(self):
return self._price
@price.setter
def price(self, value):
if not isinstance(value, self._types[2]):
raise TypeError(f'Expected {self._types[2].__name__}')
if value < 0:
raise ValueError('price must be >= 0')
self._price = value
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

28
Solutions/3_5/reader.py Normal file
View File

@@ -0,0 +1,28 @@
# reader.py
import csv
def read_csv_as_dicts(filename, types):
'''
Read a CSV file into a list of dicts with column type conversion
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = { name: func(val) for name, func, val in zip(headers, types, row) }
records.append(record)
return records
def read_csv_as_instances(filename, cls):
'''
Read a CSV file into a list of instances
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
records.append(cls.from_row(row))
return records

52
Solutions/3_5/stock.py Normal file
View File

@@ -0,0 +1,52 @@
# stock.py
class Stock:
__slots__ = ('name','_shares','_price')
_types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls._types, row)]
return cls(*values)
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
if not isinstance(value, self._types[1]):
raise TypeError(f'Expected {self._types[1].__name__}')
if value < 0:
raise ValueError('shares must be >= 0')
self._shares = value
@property
def price(self):
return self._price
@price.setter
def price(self, value):
if not isinstance(value, self._types[2]):
raise TypeError(f'Expected {self._types[2].__name__}')
if value < 0:
raise ValueError('price must be >= 0')
self._price = value
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
# Sample
if __name__ == '__main__':
import reader
from tableformat import create_formatter, print_table
portfolio = reader.read_csv_as_instances('../../Data/portfolio.csv', Stock)
formatter = create_formatter('text')
print_table(portfolio,['name','shares','price'], formatter)

View File

@@ -0,0 +1,57 @@
# tableformat.py
def print_table(records, fields, formatter):
formatter.headings(fields)
for r in records:
rowdata = [getattr(r, fieldname) for fieldname in fields]
formatter.row(rowdata)
class TableFormatter:
def headings(self, headers):
raise NotImplementedError()
def row(self, rowdata):
raise NotImplementedError()
class TextTableFormatter(TableFormatter):
def headings(self, headers):
print(' '.join('%10s' % h for h in headers))
print(('-'*10 + ' ')*len(headers))
def row(self, rowdata):
print(' '.join('%10s' % d for d in rowdata))
class CSVTableFormatter(TableFormatter):
def headings(self, headers):
print(','.join(headers))
def row(self, rowdata):
print(','.join(str(d) for d in rowdata))
class HTMLTableFormatter(TableFormatter):
def headings(self, headers):
print('<tr>', end=' ')
for h in headers:
print('<th>%s</th>' % h, end=' ')
print('</tr>')
def row(self, rowdata):
print('<tr>', end=' ')
for d in rowdata:
print('<td>%s</td>' % d, end=' ')
print('</tr>')
def create_formatter(name):
if name == 'text':
formatter_cls = TextTableFormatter
elif name == 'csv':
formatter_cls = CSVTableFormatter
elif name == 'html':
formatter_cls = HTMLTableFormatter
else:
raise RuntimeError('Unknown format %s' % name)
return formatter_cls()

28
Solutions/3_6/reader.py Normal file
View File

@@ -0,0 +1,28 @@
# reader.py
import csv
def read_csv_as_dicts(filename, types):
'''
Read a CSV file into a list of dicts with column type conversion
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = { name: func(val) for name, func, val in zip(headers, types, row) }
records.append(record)
return records
def read_csv_as_instances(filename, cls):
'''
Read a CSV file into a list of instances
'''
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
records.append(cls.from_row(row))
return records

60
Solutions/3_6/stock.py Normal file
View File

@@ -0,0 +1,60 @@
# stock.py
class Stock:
__slots__ = ('name','_shares','_price')
_types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
# Note: The !r format code produces the repr() string
return f'{type(self).__name__}({self.name!r}, {self.shares!r}, {self.price!r})'
def __eq__(self, other):
return isinstance(other, Stock) and ((self.name, self.shares, self.price) ==
(other.name, other.shares, other.price))
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls._types, row)]
return cls(*values)
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
if not isinstance(value, self._types[1]):
raise TypeError(f'Expected {self._types[1].__name__}')
if value < 0:
raise ValueError('shares must be >= 0')
self._shares = value
@property
def price(self):
return self._price
@price.setter
def price(self, value):
if not isinstance(value, self._types[2]):
raise TypeError(f'Expected {self._types[2].__name__}')
if value < 0:
raise ValueError('price must be >= 0')
self._price = value
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
# Sample
if __name__ == '__main__':
import reader
from tableformat import create_formatter, print_table
portfolio = reader.read_csv_as_instances('../../Data/portfolio.csv', Stock)
formatter = create_formatter('text')
print_table(portfolio,['name','shares','price'], formatter)

View File

@@ -0,0 +1,57 @@
# tableformat.py
def print_table(records, fields, formatter):
formatter.headings(fields)
for r in records:
rowdata = [getattr(r, fieldname) for fieldname in fields]
formatter.row(rowdata)
class TableFormatter:
def headings(self, headers):
raise NotImplementedError()
def row(self, rowdata):
raise NotImplementedError()
class TextTableFormatter(TableFormatter):
def headings(self, headers):
print(' '.join('%10s' % h for h in headers))
print(('-'*10 + ' ')*len(headers))
def row(self, rowdata):
print(' '.join('%10s' % d for d in rowdata))
class CSVTableFormatter(TableFormatter):
def headings(self, headers):
print(','.join(headers))
def row(self, rowdata):
print(','.join(str(d) for d in rowdata))
class HTMLTableFormatter(TableFormatter):
def headings(self, headers):
print('<tr>', end=' ')
for h in headers:
print('<th>%s</th>' % h, end=' ')
print('</tr>')
def row(self, rowdata):
print('<tr>', end=' ')
for d in rowdata:
print('<td>%s</td>' % d, end=' ')
print('</tr>')
def create_formatter(name):
if name == 'text':
formatter_cls = TextTableFormatter
elif name == 'csv':
formatter_cls = CSVTableFormatter
elif name == 'html':
formatter_cls = HTMLTableFormatter
else:
raise RuntimeError('Unknown format %s' % name)
return formatter_cls()

42
Solutions/3_7/reader.py Normal file
View File

@@ -0,0 +1,42 @@
# reader.py
import csv
from abc import ABC, abstractmethod
class CSVParser(ABC):
def parse(self, filename):
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = self.make_record(headers, row)
records.append(record)
return records
@abstractmethod
def make_record(self, headers, row):
pass
class DictCSVParser(CSVParser):
def __init__(self, types):
self.types = types
def make_record(self, headers, row):
return { name: func(val) for name, func, val in zip(headers, self.types, row) }
class InstanceCSVParser(CSVParser):
def __init__(self, cls):
self.cls = cls
def make_record(self, headers, row):
return self.cls.from_row(row)
def read_csv_as_dicts(filename, types):
parser = DictCSVParser(types)
return parser.parse(filename)
def read_csv_as_instances(filename, cls):
parser = InstanceCSVParser(cls)
return parser.parse(filename)

60
Solutions/3_7/stock.py Normal file
View File

@@ -0,0 +1,60 @@
# stock.py
class Stock:
__slots__ = ('name','_shares','_price')
_types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
# Note: The !r format code produces the repr() string
return f'{type(self).__name__}({self.name!r}, {self.shares!r}, {self.price!r})'
def __eq__(self, other):
return isinstance(other, Stock) and ((self.name, self.shares, self.price) ==
(other.name, other.shares, other.price))
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls._types, row)]
return cls(*values)
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
if not isinstance(value, self._types[1]):
raise TypeError(f'Expected {self._types[1].__name__}')
if value < 0:
raise ValueError('shares must be >= 0')
self._shares = value
@property
def price(self):
return self._price
@price.setter
def price(self, value):
if not isinstance(value, self._types[2]):
raise TypeError(f'Expected {self._types[2].__name__}')
if value < 0:
raise ValueError('price must be >= 0')
self._price = value
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
# Sample
if __name__ == '__main__':
import reader
from tableformat import create_formatter, print_table
portfolio = reader.read_csv_as_instances('../../Data/portfolio.csv', Stock)
formatter = create_formatter('text')
print_table(portfolio,['name','shares','price'], formatter)

View File

@@ -0,0 +1,62 @@
# tableformat.py
from abc import ABC, abstractmethod
def print_table(records, fields, formatter):
if not isinstance(formatter, TableFormatter):
raise TypeError('Expected a TableFormatter')
formatter.headings(fields)
for r in records:
rowdata = [getattr(r, fieldname) for fieldname in fields]
formatter.row(rowdata)
class TableFormatter(ABC):
@abstractmethod
def headings(self, headers):
pass
@abstractmethod
def row(self, rowdata):
pass
class TextTableFormatter(TableFormatter):
def headings(self, headers):
print(' '.join('%10s' % h for h in headers))
print(('-'*10 + ' ')*len(headers))
def row(self, rowdata):
print(' '.join('%10s' % d for d in rowdata))
class CSVTableFormatter(TableFormatter):
def headings(self, headers):
print(','.join(headers))
def row(self, rowdata):
print(','.join(str(d) for d in rowdata))
class HTMLTableFormatter(TableFormatter):
def headings(self, headers):
print('<tr>', end=' ')
for h in headers:
print('<th>%s</th>' % h, end=' ')
print('</tr>')
def row(self, rowdata):
print('<tr>', end=' ')
for d in rowdata:
print('<td>%s</td>' % d, end=' ')
print('</tr>')
def create_formatter(name):
if name == 'text':
formatter = TextTableFormatter
elif name == 'csv':
formatter = CSVTableFormatter
elif name == 'html':
formatter = HTMLTableFormatter
else:
raise RuntimeError('Unknown format %s' % name)
return formatter()

42
Solutions/3_8/reader.py Normal file
View File

@@ -0,0 +1,42 @@
# reader.py
import csv
from abc import ABC, abstractmethod
class CSVParser(ABC):
def parse(self, filename):
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
record = self.make_record(headers, row)
records.append(record)
return records
@abstractmethod
def make_record(self, headers, row):
pass
class DictCSVParser(CSVParser):
def __init__(self, types):
self.types = types
def make_record(self, headers, row):
return { name: func(val) for name, func, val in zip(headers, self.types, row) }
class InstanceCSVParser(CSVParser):
def __init__(self, cls):
self.cls = cls
def make_record(self, headers, row):
return self.cls.from_row(row)
def read_csv_as_dicts(filename, types):
parser = DictCSVParser(types)
return parser.parse(filename)
def read_csv_as_instances(filename, cls):
parser = InstanceCSVParser(cls)
return parser.parse(filename)

84
Solutions/3_8/stock.py Normal file
View File

@@ -0,0 +1,84 @@
# stock.py
class Stock:
__slots__ = ('name','_shares','_price')
_types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
# Note: The !r format code produces the repr() string
return f'{type(self).__name__}({self.name!r}, {self.shares!r}, {self.price!r})'
def __eq__(self, other):
return isinstance(other, Stock) and ((self.name, self.shares, self.price) ==
(other.name, other.shares, other.price))
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls._types, row)]
return cls(*values)
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
if not isinstance(value, self._types[1]):
raise TypeError(f'Expected {self._types[1].__name__}')
if value < 0:
raise ValueError('shares must be >= 0')
self._shares = value
@property
def price(self):
return self._price
@price.setter
def price(self, value):
if not isinstance(value, self._types[2]):
raise TypeError(f'Expected {self._types[2].__name__}')
if value < 0:
raise ValueError('price must be >= 0')
self._price = value
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
# Sample
if __name__ == '__main__':
import tableformat
import reader
from tableformat import (
print_table,
create_formatter,
TextTableFormatter,
ColumnFormatMixin,
UpperHeadersMixin
)
portfolio = reader.read_csv_as_instances('../../Data/portfolio.csv', Stock)
class PortfolioFormatter(ColumnFormatMixin, TextTableFormatter):
formats = ['%s','%d','%0.2f']
formatter = PortfolioFormatter()
print_table(portfolio,['name','shares','price'], formatter)
class PortfolioFormatter(UpperHeadersMixin, TextTableFormatter):
pass
formatter = PortfolioFormatter()
print_table(portfolio, ['name','shares','price'], formatter)
# Factory function version
formatter = create_formatter('text', column_formats=['%s','%d','%0.2f'])
print_table(portfolio, ['name','shares','price'], formatter)
formatter = create_formatter('text', upper_headers=True)
print_table(portfolio, ['name','shares','price'], formatter)

View File

@@ -0,0 +1,83 @@
# tableformat.py
from abc import ABC, abstractmethod
def print_table(records, fields, formatter):
if not isinstance(formatter, TableFormatter):
raise TypeError('Expected a TableFormatter')
formatter.headings(fields)
for r in records:
rowdata = [getattr(r, fieldname) for fieldname in fields]
formatter.row(rowdata)
class TableFormatter(ABC):
@abstractmethod
def headings(self, headers):
pass
@abstractmethod
def row(self, rowdata):
pass
class TextTableFormatter(TableFormatter):
def headings(self, headers):
print(' '.join('%10s' % h for h in headers))
print(('-'*10 + ' ')*len(headers))
def row(self, rowdata):
print(' '.join('%10s' % d for d in rowdata))
class CSVTableFormatter(TableFormatter):
def headings(self, headers):
print(','.join(headers))
def row(self, rowdata):
print(','.join(str(d) for d in rowdata))
class HTMLTableFormatter(TableFormatter):
def headings(self, headers):
print('<tr>', end=' ')
for h in headers:
print('<th>%s</th>' % h, end=' ')
print('</tr>')
def row(self, rowdata):
print('<tr>', end=' ')
for d in rowdata:
print('<td>%s</td>' % d, end=' ')
print('</tr>')
class ColumnFormatMixin:
formats = []
def row(self, rowdata):
rowdata = [ (fmt % item) for fmt, item in zip(self.formats, rowdata)]
super().row(rowdata)
class UpperHeadersMixin:
def headings(self, headers):
super().headings([h.upper() for h in headers])
def create_formatter(name, column_formats=None, upper_headers=False):
if name == 'text':
formatter_cls = TextTableFormatter
elif name == 'csv':
formatter_cls = CSVTableFormatter
elif name == 'html':
formatter_cls = HTMLTableFormatter
else:
raise RuntimeError('Unknown format %s' % name)
if column_formats:
class formatter_cls(ColumnFormatMixin, formatter_cls):
formats = column_formats
if upper_headers:
class formatter_cls(UpperHeadersMixin, formatter_cls):
pass
return formatter_cls()

82
Solutions/4_2/validate.py Normal file
View File

@@ -0,0 +1,82 @@
class Validator:
@classmethod
def check(cls, value):
return value
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'Expected {cls.expected_type}')
return super().check(value)
class Integer(Typed):
expected_type = int
class Float(Typed):
expected_type = float
class String(Typed):
expected_type = str
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('Must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('Must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
# Examples
if __name__ == '__main__':
def add(x, y):
Integer.check(x)
Integer.check(y)
return x + y
class Stock:
__slots__ = ('name','_shares','_price')
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
self._shares = PositiveInteger.check(value)
@property
def price(self):
return self._price
@price.setter
def price(self, value):
self._price = PositiveFloat.check(value)
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

84
Solutions/4_3/validate.py Normal file
View File

@@ -0,0 +1,84 @@
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
class Integer(Typed):
expected_type = int
class Float(Typed):
expected_type = float
class String(Typed):
expected_type = str
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
# Examples
if __name__ == '__main__':
def add(x, y):
Integer.check(x)
Integer.check(y)
return x + y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

49
Solutions/5_2/reader.py Normal file
View File

@@ -0,0 +1,49 @@
# reader.py
from abc import ABC, abstractmethod
import csv
import logging
log = logging.getLogger(__name__)
class CSVParser(ABC):
def parse(self, filename):
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for rowno, row in enumerate(rows, start=1):
try:
record = self.make_record(headers, row)
records.append(record)
except ValueError as e:
log.warning('Row %d: Bad row: %s', rowno, row)
log.debug('Row %d: Reason: %s', rowno, e)
return records
@abstractmethod
def make_record(self, headers, row):
raise NotImplementedError()
class DictCSVParser(CSVParser):
def __init__(self, types):
self.types = types
def make_record(self, headers, row):
return { name: func(val) for name, func, val in zip(headers, self.types, row) }
class InstanceCSVParser(CSVParser):
def __init__(self, cls):
self.cls = cls
def make_record(self, headers, row):
return self.cls.from_row(row)
def read_csv_as_dicts(filename, types):
parser = DictCSVParser(types)
return parser.parse(filename)
def read_csv_as_instances(filename, cls):
parser = InstanceCSVParser(cls)
return parser.parse(filename)

51
Solutions/5_2/stock.py Normal file
View File

@@ -0,0 +1,51 @@
# stock.py
class Stock:
__slots__ = ('name','_shares','_price')
_types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
# Note: The !r format code produces the repr() string
return f'{type(self).__name__}({self.name!r}, {self.shares!r}, {self.price!r})'
def __eq__(self, other):
return isinstance(other, Stock) and ((self.name, self.shares, self.price) ==
(other.name, other.shares, other.price))
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls._types, row)]
return cls(*values)
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
if not isinstance(value, self._types[1]):
raise TypeError(f'Expected {self._types[1].__name__}')
if value < 0:
raise ValueError('shares must be >= 0')
self._shares = value
@property
def price(self):
return self._price
@price.setter
def price(self, value):
if not isinstance(value, self._types[2]):
raise TypeError(f'Expected {self._types[2].__name__}')
if value < 0:
raise ValueError('price must be >= 0')
self._price = value
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

32
Solutions/5_3/reader.py Normal file
View File

@@ -0,0 +1,32 @@
# reader.py
import csv
def convert_csv(lines, converter, *, headers=None):
rows = csv.reader(lines)
if headers is None:
headers = next(rows)
return list(map(lambda row: converter(headers, row), rows))
def csv_as_dicts(lines, types, *, headers=None):
return convert_csv(lines,
lambda headers, row: { name: func(val) for name, func, val in zip(headers, types, row) })
def csv_as_instances(lines, cls, *, headers=None):
return convert_csv(lines,
lambda headers, row: cls.from_row(row))
def read_csv_as_dicts(filename, types, *, headers=None):
'''
Read CSV data into a list of dictionaries with optional type conversion
'''
with open(filename) as file:
return csv_as_dicts(file, types, headers=headers)
def read_csv_as_instances(filename, cls, *, headers=None):
'''
Read CSV data into a list of instances
'''
with open(filename) as file:
return csv_as_instances(file, cls, headers=headers)

51
Solutions/5_3/stock.py Normal file
View File

@@ -0,0 +1,51 @@
# stock.py
class Stock:
__slots__ = ('name','_shares','_price')
_types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
# Note: The !r format code produces the repr() string
return f'{type(self).__name__}({self.name!r}, {self.shares!r}, {self.price!r})'
def __eq__(self, other):
return isinstance(other, Stock) and ((self.name, self.shares, self.price) ==
(other.name, other.shares, other.price))
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls._types, row)]
return cls(*values)
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
if not isinstance(value, self._types[1]):
raise TypeError(f'Expected {self._types[1].__name__}')
if value < 0:
raise ValueError('shares must be >= 0')
self._shares = value
@property
def price(self):
return self._price
@price.setter
def price(self, value):
if not isinstance(value, self._types[2]):
raise TypeError(f'Expected {self._types[2].__name__}')
if value < 0:
raise ValueError('price must be >= 0')
self._price = value
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

View File

@@ -0,0 +1,33 @@
# typedproperty.py
def typedproperty(name, expected_type):
private_name = '_' + name
@property
def value(self):
return getattr(self, private_name)
@value.setter
def value(self, val):
if not isinstance(val, expected_type):
raise TypeError(f'Expected {expected_type}')
setattr(self, private_name, val)
return value
String = lambda name: typedproperty(name, str)
Integer = lambda name: typedproperty(name, int)
Float = lambda name: typedproperty(name, float)
# Example
if __name__ == '__main__':
class Stock:
name = String('name')
shares = Integer('shares')
price = Float('price')
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price

43
Solutions/5_5/reader.py Normal file
View File

@@ -0,0 +1,43 @@
# reader.py
import csv
import logging
log = logging.getLogger(__name__)
def convert_csv(lines, converter, *, headers=None):
rows = csv.reader(lines)
if headers is None:
headers = next(rows)
records = []
for rowno, row in enumerate(rows, start=1):
try:
records.append(converter(headers, row))
except ValueError as e:
log.warning('Row %s: Bad row: %s', rowno, row)
log.debug('Row %s: Reason: %s', rowno, row)
return records
def csv_as_dicts(lines, types, *, headers=None):
return convert_csv(lines,
lambda headers, row: { name: func(val) for name, func, val in zip(headers, types, row) })
def csv_as_instances(lines, cls, *, headers=None):
return convert_csv(lines,
lambda headers, row: cls.from_row(row))
def read_csv_as_dicts(filename, types, *, headers=None):
'''
Read CSV data into a list of dictionaries with optional type conversion
'''
with open(filename) as file:
return csv_as_dicts(file, types, headers=headers)
def read_csv_as_instances(filename, cls, *, headers=None):
'''
Read CSV data into a list of instances
'''
with open(filename) as file:
return csv_as_instances(file, cls, headers=headers)

51
Solutions/5_6/stock.py Normal file
View File

@@ -0,0 +1,51 @@
# stock.py
class Stock:
__slots__ = ('name','_shares','_price')
_types = (str, int, float)
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
# Note: The !r format code produces the repr() string
return f'{type(self).__name__}({self.name!r}, {self.shares!r}, {self.price!r})'
def __eq__(self, other):
return isinstance(other, Stock) and ((self.name, self.shares, self.price) ==
(other.name, other.shares, other.price))
@classmethod
def from_row(cls, row):
values = [func(val) for func, val in zip(cls._types, row)]
return cls(*values)
@property
def shares(self):
return self._shares
@shares.setter
def shares(self, value):
if not isinstance(value, self._types[1]):
raise TypeError(f'Expected {self._types[1].__name__}')
if value < 0:
raise ValueError('shares must be >= 0')
self._shares = value
@property
def price(self):
return self._price
@price.setter
def price(self, value):
if not isinstance(value, self._types[2]):
raise TypeError(f'Expected {self._types[2].__name__}')
if value < 0:
raise ValueError('price must be >= 0')
self._price = value
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

14
Solutions/6_1/stock.py Normal file
View File

@@ -0,0 +1,14 @@
# stock.py
from structure import Structure
class Stock(Structure):
_fields = ('name', 'shares', 'price')
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

View File

@@ -0,0 +1,19 @@
# structure.py
class Structure:
_fields = ()
def __init__(self, *args):
if len(args) != len(self._fields):
raise TypeError('Expected %d arguments' % len(self._fields))
for name, val in zip(self._fields, args):
setattr(self, name, val)
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

16
Solutions/6_2/stock.py Normal file
View File

@@ -0,0 +1,16 @@
# stock.py
from structure import Structure
class Stock(Structure):
_fields = ('name', 'shares', 'price')
def __init__(self, name, shares, price):
self._init()
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

View File

@@ -0,0 +1,23 @@
# structure.py
import sys
class Structure:
_fields = ()
@staticmethod
def _init():
locs = sys._getframe(1).f_locals
self = locs.pop('self')
for name, val in locs.items():
setattr(self, name, val)
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

17
Solutions/6_3/stock.py Normal file
View File

@@ -0,0 +1,17 @@
# stock.py
from structure import Structure
class Stock(Structure):
def __init__(self, name, shares, price):
self._init()
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
Stock.set_fields()

View File

@@ -0,0 +1,30 @@
# structure.py
import sys
import inspect
class Structure:
_fields = ()
@staticmethod
def _init():
locs = sys._getframe(1).f_locals
self = locs.pop('self')
for name, val in locs.items():
setattr(self, name, val)
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))
@classmethod
def set_fields(cls):
sig = inspect.signature(cls)
cls._fields = tuple(sig.parameters)

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

16
Solutions/6_4/stock.py Normal file
View File

@@ -0,0 +1,16 @@
# stock.py
from structure import Structure
class Stock(Structure):
_fields = ('name', 'shares', 'price')
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
Stock.create_init()

View File

@@ -0,0 +1,27 @@
# structure.py
class Structure:
_fields = ()
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))
@classmethod
def create_init(cls):
'''
Create an __init__ method from _fields
'''
args = ','.join(cls._fields)
code = f'def __init__(self, {args}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = { }
exec(code, locs)
cls.__init__ = locs['__init__']

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

109
Solutions/6_5/validate.py Normal file
View File

@@ -0,0 +1,109 @@
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
class Integer(Typed):
expected_type = int
class Float(Typed):
expected_type = float
class String(Typed):
expected_type = str
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
class ValidatedFunction:
def __init__(self, func):
self.func = func
self.signature = signature(func)
self.annotations = dict(func.__annotations__)
self.retcheck = self.annotations.pop('return', None)
def __call__(self, *args, **kwargs):
bound = self.signature.bind(*args, **kwargs)
for name, val in self.annotations.items():
val.check(bound.arguments[name])
result = self.func(*args, **kwargs)
if self.retcheck:
self.retcheck.check(result)
return result
# Examples
if __name__ == '__main__':
def add(x:Integer, y:Integer) -> Integer:
return x + y
add = ValidatedFunction(add)
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
sell = ValidatedFunction(sell) # Broken

10
Solutions/7_1/logcall.py Normal file
View File

@@ -0,0 +1,10 @@
# logcall.py
def logged(func):
print('Adding logging to', func.__name__)
def wrapper(*args,**kwargs):
print('Calling', func.__name__)
return func(*args,**kwargs)
return wrapper

11
Solutions/7_1/sample.py Normal file
View File

@@ -0,0 +1,11 @@
# sample.py
from logcall import logged
@logged
def add(x,y):
return x+y
@logged
def sub(x,y):
return x-y

131
Solutions/7_1/validate.py Normal file
View File

@@ -0,0 +1,131 @@
# validate.py
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
class Integer(Typed):
expected_type = int
class Float(Typed):
expected_type = float
class String(Typed):
expected_type = str
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
def isvalidator(item):
return isinstance(item, type) and issubclass(item, Validator)
def validated(func):
sig = signature(func)
# Gather the function annotations
annotations = { name:val for name, val in func.__annotations__.items()
if isvalidator(val) }
# Get the return annotation (if any)
retcheck = annotations.pop('return', None)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
# Enforce return check (if any)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
# Examples
if __name__ == '__main__':
@validated
def add(x:Integer, y:Integer) -> Integer:
return x + y
@validated
def div(x:Integer, y:Integer) -> Integer:
return x / y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
@validated
def sell(self, nshares:PositiveInteger):
self.shares -= nshares

21
Solutions/7_2/logcall.py Normal file
View File

@@ -0,0 +1,21 @@
# logcall.py
from functools import wraps
def logformat(fmt):
def logged(func):
print('Adding logging to', func.__name__)
@wraps(func)
def wrapper(*args,**kwargs):
print(fmt.format(func=func))
return func(*args, **kwargs)
return wrapper
return logged
# Original no-argument @logged decorator defined in terms of the more
# general @logformat decorator
logged = logformat('Calling {func.__name__}')

15
Solutions/7_2/sample.py Normal file
View File

@@ -0,0 +1,15 @@
# sample.py
from logcall import logged, logformat
@logged
def add(x,y):
return x+y
@logged
def sub(x,y):
return x-y
@logformat('{func.__code__.co_filename}:{func.__name__}')
def mul(x,y):
return x*y

21
Solutions/7_2/spam.py Normal file
View File

@@ -0,0 +1,21 @@
from logcall import logged
class Spam:
@logged
def instance_method(self):
pass
@classmethod
@logged
def class_method(cls):
pass
@staticmethod
@logged
def static_method():
pass
@property
@logged
def property_method(self):
pass

169
Solutions/7_2/validate.py Normal file
View File

@@ -0,0 +1,169 @@
# validate.py
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
class Integer(Typed):
expected_type = int
class Float(Typed):
expected_type = float
class String(Typed):
expected_type = str
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
from functools import wraps
def isvalidator(item):
return isinstance(item, type) and issubclass(item, Validator)
def validated(func):
sig = signature(func)
# Gather the function annotations
annotations = { name:val for name, val in func.__annotations__.items()
if isvalidator(val) }
# Get the return annotation (if any)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
# Enforce return check (if any)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**annotations):
retcheck = annotations.pop('return_', None)
def decorate(func):
sig = signature(func)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return decorate
# Examples
if __name__ == '__main__':
@validated
def add(x:Integer, y:Integer) -> Integer:
return x + y
@validated
def div(x:Integer, y:Integer) -> Integer:
return x / y
@enforce(x=Integer, y=Integer)
def sub(x, y):
return x - y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
@validated
def sell(self, nshares:PositiveInteger):
self.shares -= nshares

43
Solutions/7_3/reader.py Normal file
View File

@@ -0,0 +1,43 @@
# reader.py
import csv
import logging
log = logging.getLogger(__name__)
def convert_csv(lines, converter, *, headers=None):
rows = csv.reader(lines)
if headers is None:
headers = next(rows)
records = []
for rowno, row in enumerate(rows, start=1):
try:
records.append(converter(headers, row))
except ValueError as e:
log.warning('Row %s: Bad row: %s', rowno, row)
log.debug('Row %s: Reason: %s', rowno, row)
return records
def csv_as_dicts(lines, types, *, headers=None):
return convert_csv(lines,
lambda headers, row: { name: func(val) for name, func, val in zip(headers, types, row) })
def csv_as_instances(lines, cls, *, headers=None):
return convert_csv(lines,
lambda headers, row: cls.from_row(row))
def read_csv_as_dicts(filename, types, *, headers=None):
'''
Read CSV data into a list of dictionaries with optional type conversion
'''
with open(filename) as file:
return csv_as_dicts(file, types, headers=headers)
def read_csv_as_instances(filename, cls, *, headers=None):
'''
Read CSV data into a list of instances
'''
with open(filename) as file:
return csv_as_instances(file, cls, headers=headers)

16
Solutions/7_3/stock.py Normal file
View File

@@ -0,0 +1,16 @@
# stock.py
from structure import Structure
from validate import String, PositiveInteger, PositiveFloat
class Stock(Structure):
name = String()
shares = PositiveInteger()
price = PositiveFloat()
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares: PositiveInteger):
self.shares -= nshares

View File

@@ -0,0 +1,70 @@
# structure.py
from validate import Validator, validated
class Structure:
_fields = ()
_types = ()
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))
@classmethod
def from_row(cls, row):
rowdata = [ func(val) for func, val in zip(cls._types, row) ]
return cls(*rowdata)
@classmethod
def create_init(cls):
'''
Create an __init__ method from _fields
'''
args = ','.join(cls._fields)
code = f'def __init__(self, {args}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = { }
exec(code, locs)
cls.__init__ = locs['__init__']
@classmethod
def __init_subclass__(cls):
# Apply the validated decorator to subclasses
validate_attributes(cls)
def validate_attributes(cls):
'''
Class decorator that scans a class definition for Validators
and builds a _fields variable that captures their definition order.
'''
validators = []
for name, val in vars(cls).items():
if isinstance(val, Validator):
validators.append(val)
# Apply validated decorator to any callable with annotations
elif callable(val) and val.__annotations__:
setattr(cls, name, validated(val))
# Collect all of the field names
cls._fields = tuple([v.name for v in validators])
# Collect type conversions. The lambda x:x is an identity
# function that's used in case no expected_type is found.
cls._types = tuple([ getattr(v, 'expected_type', lambda x: x)
for v in validators ])
# Create the __init__ method
if cls._fields:
cls.create_init()
return cls

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

168
Solutions/7_3/validate.py Normal file
View File

@@ -0,0 +1,168 @@
# validate.py
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
class Integer(Typed):
expected_type = int
class Float(Typed):
expected_type = float
class String(Typed):
expected_type = str
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
from functools import wraps
def isvalidator(item):
return isinstance(item, type) and issubclass(item, Validator)
def validated(func):
sig = signature(func)
# Gather the function annotations
annotations = { name:val for name, val in func.__annotations__.items()
if isvalidator(val) }
# Get the return annotation (if any)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
# Enforce return check (if any)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**annotations):
retcheck = annotations.pop('return_', None)
def decorate(func):
sig = signature(func)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return decorate
# Examples
if __name__ == '__main__':
@validated
def add(x:Integer, y:Integer) -> Integer:
return x + y
@validated
def div(x:Integer, y:Integer) -> Integer:
return x / y
@enforce(x=Integer, y=Integer)
def sub(x, y):
return x - y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
@validated
def sell(self, nshares:PositiveInteger):
self.shares -= nshares

16
Solutions/7_4/stock.py Normal file
View File

@@ -0,0 +1,16 @@
# stock.py
from structure import Structure
from validate import String, PositiveInteger, PositiveFloat
class Stock(Structure):
name = String()
shares = PositiveInteger()
price = PositiveFloat()
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares: PositiveInteger):
self.shares -= nshares

View File

@@ -0,0 +1,73 @@
# structure.py
from validate import Validator, validated
class Structure:
_fields = ()
_types = ()
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))
@classmethod
def from_row(cls, row):
rowdata = [ func(val) for func, val in zip(cls._types, row) ]
return cls(*rowdata)
@classmethod
def create_init(cls):
'''
Create an __init__ method from _fields
'''
args = ','.join(cls._fields)
code = f'def __init__(self, {args}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = { }
exec(code, locs)
cls.__init__ = locs['__init__']
@classmethod
def __init_subclass__(cls):
# Apply the validated decorator to subclasses
validate_attributes(cls)
def validate_attributes(cls):
'''
Class decorator that scans a class definition for Validators
and builds a _fields variable that captures their definition order.
'''
validators = []
for name, val in vars(cls).items():
if isinstance(val, Validator):
validators.append(val)
# Apply validated decorator to any callable with annotations
elif callable(val) and val.__annotations__:
setattr(cls, name, validated(val))
# Collect all of the field names
cls._fields = tuple([v.name for v in validators])
# Collect type conversions. The lambda x:x is an identity
# function that's used in case no expected_type is found.
cls._types = tuple([ getattr(v, 'expected_type', lambda x: x)
for v in validators ])
# Create the __init__ method
if cls._fields:
cls.create_init()
return cls
def typed_structure(clsname, **validators):
cls = type(clsname, (Structure,), validators)
return cls

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

167
Solutions/7_4/validate.py Normal file
View File

@@ -0,0 +1,167 @@
# validate.py
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
_typed_classes = [
('Integer', int),
('Float', float),
('String', str) ]
globals().update((name, type(name, (Typed,), {'expected_type':ty}))
for name, ty in _typed_classes)
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
from functools import wraps
def isvalidator(item):
return isinstance(item, type) and issubclass(item, Validator)
def validated(func):
sig = signature(func)
# Gather the function annotations
annotations = { name:val for name, val in func.__annotations__.items()
if isvalidator(val) }
# Get the return annotation (if any)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
# Enforce return check (if any)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**annotations):
retcheck = annotations.pop('return_', None)
def decorate(func):
sig = signature(func)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return decorate
# Examples
if __name__ == '__main__':
@validated
def add(x:Integer, y:Integer) -> Integer:
return x + y
@validated
def div(x:Integer, y:Integer) -> Integer:
return x / y
@enforce(x=Integer, y=Integer)
def sub(x, y):
return x - y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
@validated
def sell(self, nshares:PositiveInteger):
self.shares -= nshares

25
Solutions/7_5/mymeta.py Normal file
View File

@@ -0,0 +1,25 @@
# mymeta.py
class mytype(type):
@staticmethod
def __new__(meta, name, bases, __dict__):
print("Creating class :", name)
print("Base classes :", bases)
print("Attributes :", list(__dict__.keys()))
return super().__new__(meta, name, bases, __dict__)
class myobject(metaclass=mytype):
pass
class Stock(myobject):
def __init__(self,name,shares,price):
self.name = name
self.shares = shares
self.price = price
def cost(self):
return self.shares*self.price
def sell(self,nshares):
self.shares -= nshares
class MyStock(Stock):
pass

43
Solutions/7_6/reader.py Normal file
View File

@@ -0,0 +1,43 @@
# reader.py
import csv
import logging
log = logging.getLogger(__name__)
def convert_csv(lines, converter, *, headers=None):
rows = csv.reader(lines)
if headers is None:
headers = next(rows)
records = []
for rowno, row in enumerate(rows, start=1):
try:
records.append(converter(headers, row))
except ValueError as e:
log.warning('Row %s: Bad row: %s', rowno, row)
log.debug('Row %s: Reason: %s', rowno, row)
return records
def csv_as_dicts(lines, types, *, headers=None):
return convert_csv(lines,
lambda headers, row: { name: func(val) for name, func, val in zip(headers, types, row) })
def csv_as_instances(lines, cls, *, headers=None):
return convert_csv(lines,
lambda headers, row: cls.from_row(row))
def read_csv_as_dicts(filename, types, *, headers=None):
'''
Read CSV data into a list of dictionaries with optional type conversion
'''
with open(filename) as file:
return csv_as_dicts(file, types, headers=headers)
def read_csv_as_instances(filename, cls, *, headers=None):
'''
Read CSV data into a list of instances
'''
with open(filename) as file:
return csv_as_instances(file, cls, headers=headers)

23
Solutions/7_6/stock.py Normal file
View File

@@ -0,0 +1,23 @@
# stock.py
from structure import Structure
class Stock(Structure):
name = String()
shares = PositiveInteger()
price = PositiveFloat()
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares: PositiveInteger):
self.shares -= nshares
if __name__ == '__main__':
from reader import read_csv_as_instances
from tableformat import create_formatter, print_table
portfolio = read_csv_as_instances('../../Data/portfolio.csv', Stock)
formatter = create_formatter('text')
print_table(portfolio, ['name','shares','price'], formatter)

View File

@@ -0,0 +1,85 @@
# structure.py
from validate import Validator, validated
from collections import ChainMap
class StructureMeta(type):
@classmethod
def __prepare__(meta, clsname, bases):
return ChainMap({}, Validator.validators)
@staticmethod
def __new__(meta, name, bases, methods):
methods = methods.maps[0]
return super().__new__(meta, name, bases, methods)
class Structure(metaclass=StructureMeta):
_fields = ()
_types = ()
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))
@classmethod
def from_row(cls, row):
rowdata = [ func(val) for func, val in zip(cls._types, row) ]
return cls(*rowdata)
@classmethod
def create_init(cls):
'''
Create an __init__ method from _fields
'''
args = ','.join(cls._fields)
code = f'def __init__(self, {args}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = { }
exec(code, locs)
cls.__init__ = locs['__init__']
@classmethod
def __init_subclass__(cls):
# Apply the validated decorator to subclasses
validate_attributes(cls)
def validate_attributes(cls):
'''
Class decorator that scans a class definition for Validators
and builds a _fields variable that captures their definition order.
'''
validators = []
for name, val in vars(cls).items():
if isinstance(val, Validator):
validators.append(val)
# Apply validated decorator to any callable with annotations
elif callable(val) and val.__annotations__:
setattr(cls, name, validated(val))
# Collect all of the field names
cls._fields = tuple([v.name for v in validators])
# Collect type conversions. The lambda x:x is an identity
# function that's used in case no expected_type is found.
cls._types = tuple([ getattr(v, 'expected_type', lambda x: x)
for v in validators ])
# Create the __init__ method
if cls._fields:
cls.create_init()
return cls
def typed_structure(clsname, **validators):
cls = type(clsname, (Structure,), validators)
return cls

View File

@@ -0,0 +1,82 @@
# tableformat.py
from abc import ABC, abstractmethod
def print_table(records, fields, formatter):
if not isinstance(formatter, TableFormatter):
raise RuntimeError('Expected a TableFormatter')
formatter.headings(fields)
for r in records:
rowdata = [getattr(r, fieldname) for fieldname in fields]
formatter.row(rowdata)
class TableFormatter(ABC):
@abstractmethod
def headings(self, headers):
pass
@abstractmethod
def row(self, rowdata):
pass
class TextTableFormatter(TableFormatter):
def headings(self, headers):
print(' '.join('%10s' % h for h in headers))
print(('-'*10 + ' ')*len(headers))
def row(self, rowdata):
print(' '.join('%10s' % d for d in rowdata))
class CSVTableFormatter(TableFormatter):
def headings(self, headers):
print(','.join(headers))
def row(self, rowdata):
print(','.join(str(d) for d in rowdata))
class HTMLTableFormatter(TableFormatter):
def headings(self, headers):
print('<tr>', end=' ')
for h in headers:
print('<th>%s</th>' % h, end=' ')
print('</tr>')
def row(self, rowdata):
print('<tr>', end=' ')
for d in rowdata:
print('<td>%s</td>' % d, end=' ')
print('</tr>')
class ColumnFormatMixin:
formats = []
def row(self, rowdata):
rowdata = [ (fmt % item) for fmt, item in zip(self.formats, rowdata)]
super().row(rowdata)
class UpperHeadersMixin:
def headings(self, headers):
super().headings([h.upper() for h in headers])
def create_formatter(name, column_formats=None, upper_headers=False):
if name == 'text':
formatter_cls = TextTableFormatter
elif name == 'csv':
formatter_cls = CSVTableFormatter
elif name == 'html':
formatter_cls = HTMLTableFormatter
else:
raise RuntimeError('Unknown format %s' % name)
if column_formats:
class formatter_cls(ColumnFormatMixin, formatter_cls):
formats = column_formats
if upper_headers:
class formatter_cls(UpperHeadersMixin, formatter_cls):
pass
return formatter_cls()

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

173
Solutions/7_6/validate.py Normal file
View File

@@ -0,0 +1,173 @@
# validate.py
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
# Collect all derived classes into a dict
validators = { }
@classmethod
def __init_subclass__(cls):
cls.validators[cls.__name__] = cls
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
_typed_classes = [
('Integer', int),
('Float', float),
('String', str) ]
globals().update((name, type(name, (Typed,), {'expected_type':ty}))
for name, ty in _typed_classes)
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
from functools import wraps
def isvalidator(item):
return isinstance(item, type) and issubclass(item, Validator)
def validated(func):
sig = signature(func)
# Gather the function annotations
annotations = { name:val for name, val in func.__annotations__.items()
if isvalidator(val) }
# Get the return annotation (if any)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
# Enforce return check (if any)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**annotations):
retcheck = annotations.pop('return_', None)
def decorate(func):
sig = signature(func)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return decorate
# Examples
if __name__ == '__main__':
@validated
def add(x:Integer, y:Integer) -> Integer:
return x + y
@validated
def div(x:Integer, y:Integer) -> Integer:
return x / y
@enforce(x=Integer, y=Integer)
def sub(x, y):
return x - y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
@validated
def sell(self, nshares:PositiveInteger):
self.shares -= nshares

27
Solutions/8_1/follow.py Normal file
View File

@@ -0,0 +1,27 @@
# follow.py
import os
import time
def follow(filename):
'''
Generator that produces a sequence of lines being written at the end of a file.
'''
with open(filename,'r') as f:
f.seek(0,os.SEEK_END)
while True:
line = f.readline()
if line == '':
time.sleep(0.1) # Sleep briefly to avoid busy wait
continue
yield line
# Example use
if __name__ == '__main__':
for line in follow('../../Data/stocklog.csv'):
fields = line.split(',')
name = fields[0].strip('"')
price = float(fields[1])
change = float(fields[4])
if change < 0:
print('%10s %10.2f %10.2f' % (name, price, change))

43
Solutions/8_1/reader.py Normal file
View File

@@ -0,0 +1,43 @@
# reader.py
import csv
import logging
log = logging.getLogger(__name__)
def convert_csv(lines, converter, *, headers=None):
rows = csv.reader(lines)
if headers is None:
headers = next(rows)
records = []
for rowno, row in enumerate(rows, start=1):
try:
records.append(converter(headers, row))
except ValueError as e:
log.warning('Row %s: Bad row: %s', rowno, row)
log.debug('Row %s: Reason: %s', rowno, row)
return records
def csv_as_dicts(lines, types, *, headers=None):
return convert_csv(lines,
lambda headers, row: { name: func(val) for name, func, val in zip(headers, types, row) })
def csv_as_instances(lines, cls, *, headers=None):
return convert_csv(lines,
lambda headers, row: cls.from_row(row))
def read_csv_as_dicts(filename, types, *, headers=None):
'''
Read CSV data into a list of dictionaries with optional type conversion
'''
with open(filename) as file:
return csv_as_dicts(file, types, headers=headers)
def read_csv_as_instances(filename, cls, *, headers=None):
'''
Read CSV data into a list of instances
'''
with open(filename) as file:
return csv_as_instances(file, cls, headers=headers)

24
Solutions/8_1/stock.py Normal file
View File

@@ -0,0 +1,24 @@
# stock.py
from structure import Structure
from validate import String, PositiveInteger, PositiveFloat
class Stock(Structure):
name = String('name')
shares = PositiveInteger('shares')
price = PositiveFloat('price')
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares
if __name__ == '__main__':
from reader import read_csv_as_instances
from tableformat import create_formatter, print_table
portfolio = read_csv_as_instances('../../Data/portfolio.csv', Stock)
formatter = create_formatter('text')
print_table(portfolio, ['name','shares','price'], formatter)

View File

@@ -0,0 +1,92 @@
# structure.py
from validate import Validator, validated
from collections import ChainMap
class StructureMeta(type):
@classmethod
def __prepare__(meta, clsname, bases):
return ChainMap({}, Validator.validators)
@staticmethod
def __new__(meta, name, bases, methods):
methods = methods.maps[0]
return super().__new__(meta, name, bases, methods)
class Structure(metaclass=StructureMeta):
_fields = ()
_types = ()
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))
def __iter__(self):
for name in self._fields:
yield getattr(self, name)
def __eq__(self, other):
return isinstance(other, type(self)) and tuple(self) == tuple(other)
@classmethod
def from_row(cls, row):
rowdata = [ func(val) for func, val in zip(cls._types, row) ]
return cls(*rowdata)
@classmethod
def create_init(cls):
'''
Create an __init__ method from _fields
'''
args = ','.join(cls._fields)
code = f'def __init__(self, {args}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = { }
exec(code, locs)
cls.__init__ = locs['__init__']
@classmethod
def __init_subclass__(cls):
# Apply the validated decorator to subclasses
validate_attributes(cls)
def validate_attributes(cls):
'''
Class decorator that scans a class definition for Validators
and builds a _fields variable that captures their definition order.
'''
validators = []
for name, val in vars(cls).items():
if isinstance(val, Validator):
validators.append(val)
# Apply validated decorator to any callable with annotations
elif callable(val) and val.__annotations__:
setattr(cls, name, validated(val))
# Collect all of the field names
cls._fields = tuple([v.name for v in validators])
# Collect type conversions. The lambda x:x is an identity
# function that's used in case no expected_type is found.
cls._types = tuple([ getattr(v, 'expected_type', lambda x: x)
for v in validators ])
# Create the __init__ method
if cls._fields:
cls.create_init()
return cls
def typed_structure(clsname, **validators):
cls = type(clsname, (Structure,), validators)
return cls

View File

@@ -0,0 +1,70 @@
# teststock.py
import stock
import unittest
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name='GOOG', shares=100, price=490.1)
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_cost(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(s.cost, 49010.0)
def test_sell(self):
s = stock.Stock('GOOG', 100, 490.1)
s.sell(25)
self.assertEqual(s.shares, 75)
def test_from_row(self):
s = stock.Stock.from_row(['GOOG','100','490.1'])
self.assertEqual(s.name, 'GOOG')
self.assertEqual(s.shares, 100)
self.assertEqual(s.price, 490.1)
def test_repr(self):
s = stock.Stock('GOOG', 100, 490.1)
self.assertEqual(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
a = stock.Stock('GOOG', 100, 490.1)
b = stock.Stock('GOOG', 100, 490.1)
self.assertTrue(a==b)
# Tests for failure conditions
def test_shares_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.shares = '50'
def test_shares_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_badtype(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(TypeError):
s.price = '45.23'
def test_price_badvalue(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(ValueError):
s.price = -45.23
def test_bad_attribute(self):
s = stock.Stock('GOOG', 100, 490.1)
with self.assertRaises(AttributeError):
s.share = 100
if __name__ == '__main__':
unittest.main()

173
Solutions/8_1/validate.py Normal file
View File

@@ -0,0 +1,173 @@
# validate.py
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
# Collect all derived classes into a dict
validators = { }
@classmethod
def __init_subclass__(cls):
cls.validators[cls.__name__] = cls
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
_typed_classes = [
('Integer', int),
('Float', float),
('String', str) ]
globals().update((name, type(name, (Typed,), {'expected_type':ty}))
for name, ty in _typed_classes)
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
from functools import wraps
def isvalidator(item):
return isinstance(item, type) and issubclass(item, Validator)
def validated(func):
sig = signature(func)
# Gather the function annotations
annotations = { name:val for name, val in func.__annotations__.items()
if isvalidator(val) }
# Get the return annotation (if any)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
# Enforce return check (if any)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**annotations):
retcheck = annotations.pop('return_', None)
def decorate(func):
sig = signature(func)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return decorate
# Examples
if __name__ == '__main__':
@validated
def add(x:Integer, y:Integer) -> Integer:
return x + y
@validated
def div(x:Integer, y:Integer) -> Integer:
return x / y
@enforce(x=Integer, y=Integer)
def sub(x, y):
return x - y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
@validated
def sell(self, nshares:PositiveInteger):
self.shares -= nshares

17
Solutions/8_2/follow.py Normal file
View File

@@ -0,0 +1,17 @@
# follow.py
import os
import time
import csv
def follow(filename):
'''
Generator that produces a sequence of lines being written at the end of a file.
'''
with open(filename,'r') as f:
f.seek(0,os.SEEK_END)
while True:
line = f.readline()
if line == '':
time.sleep(0.1) # Sleep briefly to avoid busy wait
continue
yield line

View File

@@ -0,0 +1,91 @@
# structure.py
from validate import Validator, validated
from collections import ChainMap
class StructureMeta(type):
@classmethod
def __prepare__(meta, clsname, bases):
return ChainMap({}, Validator.validators)
@staticmethod
def __new__(meta, name, bases, methods):
methods = methods.maps[0]
return super().__new__(meta, name, bases, methods)
class Structure(metaclass=StructureMeta):
_fields = ()
_types = ()
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))
def __iter__(self):
for name in self._fields:
yield getattr(self, name)
def __eq__(self, other):
return isinstance(other, type(self)) and tuple(self) == tuple(other)
@classmethod
def from_row(cls, row):
rowdata = [ func(val) for func, val in zip(cls._types, row) ]
return cls(*rowdata)
@classmethod
def create_init(cls):
'''
Create an __init__ method from _fields
'''
args = ','.join(cls._fields)
code = f'def __init__(self, {args}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = { }
exec(code, locs)
cls.__init__ = locs['__init__']
@classmethod
def __init_subclass__(cls):
# Apply the validated decorator to subclasses
validate_attributes(cls)
def validate_attributes(cls):
'''
Class decorator that scans a class definition for Validators
and builds a _fields variable that captures their definition order.
'''
validators = []
for name, val in vars(cls).items():
if isinstance(val, Validator):
validators.append(val)
# Apply validated decorator to any callable with annotations
elif callable(val) and val.__annotations__:
setattr(cls, name, validated(val))
# Collect all of the field names
cls._fields = tuple([v.name for v in validators])
# Collect type conversions. The lambda x:x is an identity
# function that's used in case no expected_type is found.
cls._types = tuple([ getattr(v, 'expected_type', lambda x: x)
for v in validators ])
# Create the __init__ method
if cls._fields:
cls.create_init()
return cls
def typed_structure(clsname, **validators):
cls = type(clsname, (Structure,), validators)
return cls

View File

@@ -0,0 +1,82 @@
# tableformat.py
from abc import ABC, abstractmethod
def print_table(records, fields, formatter):
if not isinstance(formatter, TableFormatter):
raise RuntimeError('Expected a TableFormatter')
formatter.headings(fields)
for r in records:
rowdata = [getattr(r, fieldname) for fieldname in fields]
formatter.row(rowdata)
class TableFormatter(ABC):
@abstractmethod
def headings(self, headers):
pass
@abstractmethod
def row(self, rowdata):
pass
class TextTableFormatter(TableFormatter):
def headings(self, headers):
print(' '.join('%10s' % h for h in headers))
print(('-'*10 + ' ')*len(headers))
def row(self, rowdata):
print(' '.join('%10s' % d for d in rowdata))
class CSVTableFormatter(TableFormatter):
def headings(self, headers):
print(','.join(headers))
def row(self, rowdata):
print(','.join(str(d) for d in rowdata))
class HTMLTableFormatter(TableFormatter):
def headings(self, headers):
print('<tr>', end=' ')
for h in headers:
print('<th>%s</th>' % h, end=' ')
print('</tr>')
def row(self, rowdata):
print('<tr>', end=' ')
for d in rowdata:
print('<td>%s</td>' % d, end=' ')
print('</tr>')
class ColumnFormatMixin:
formats = []
def row(self, rowdata):
rowdata = [ (fmt % item) for fmt, item in zip(self.formats, rowdata)]
super().row(rowdata)
class UpperHeadersMixin:
def headings(self, headers):
super().headings([h.upper() for h in headers])
def create_formatter(name, column_formats=None, upper_headers=False):
if name == 'text':
formatter_cls = TextTableFormatter
elif name == 'csv':
formatter_cls = CSVTableFormatter
elif name == 'html':
formatter_cls = HTMLTableFormatter
else:
raise RuntimeError('Unknown format %s' % name)
if column_formats:
class formatter_cls(ColumnFormatMixin, formatter_cls):
formats = column_formats
if upper_headers:
class formatter_cls(UpperHeadersMixin, formatter_cls):
pass
return formatter_cls()

26
Solutions/8_2/ticker.py Normal file
View File

@@ -0,0 +1,26 @@
# ticker.py
from structure import Structure
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
if __name__ == '__main__':
from follow import follow
import csv
from tableformat import create_formatter, print_table
formatter = create_formatter('text')
lines = follow('../../Data/stocklog.csv')
rows = csv.reader(lines)
records = (Ticker.from_row(row) for row in rows)
negative = (rec for rec in records if rec.change < 0)
print_table(negative, ['name','price','change'], formatter)

173
Solutions/8_2/validate.py Normal file
View File

@@ -0,0 +1,173 @@
# validate.py
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
# Collect all derived classes into a dict
validators = { }
@classmethod
def __init_subclass__(cls):
cls.validators[cls.__name__] = cls
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
_typed_classes = [
('Integer', int),
('Float', float),
('String', str) ]
globals().update((name, type(name, (Typed,), {'expected_type':ty}))
for name, ty in _typed_classes)
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
from functools import wraps
def isvalidator(item):
return isinstance(item, type) and issubclass(item, Validator)
def validated(func):
sig = signature(func)
# Gather the function annotations
annotations = { name:val for name, val in func.__annotations__.items()
if isvalidator(val) }
# Get the return annotation (if any)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
# Enforce return check (if any)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**annotations):
retcheck = annotations.pop('return_', None)
def decorate(func):
sig = signature(func)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return decorate
# Examples
if __name__ == '__main__':
@validated
def add(x:Integer, y:Integer) -> Integer:
return x + y
@validated
def div(x:Integer, y:Integer) -> Integer:
return x / y
@enforce(x=Integer, y=Integer)
def sub(x, y):
return x - y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
@validated
def sell(self, nshares:PositiveInteger):
self.shares -= nshares

36
Solutions/8_3/cofollow.py Normal file
View File

@@ -0,0 +1,36 @@
# cofollow.py
import os
import time
def follow(filename, target):
with open(filename, 'r') as f:
f.seek(0,os.SEEK_END)
while True:
line = f.readline()
if line != '':
target.send(line)
else:
time.sleep(0.1)
# Decorator for coroutines
from functools import wraps
def consumer(func):
@wraps(func)
def start(*args,**kwargs):
f = func(*args,**kwargs)
f.send(None)
return f
return start
# Sample coroutine
@consumer
def printer():
while True:
item = yield
print(item)
# Example use.
if __name__ == '__main__':
follow('../../Data/stocklog.csv', printer())

57
Solutions/8_3/coticker.py Normal file
View File

@@ -0,0 +1,57 @@
# coticker.py
from structure import Structure
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
from cofollow import consumer, follow
from tableformat import create_formatter
import csv
@consumer
def to_csv(target):
def producer():
while True:
yield line
reader = csv.reader(producer())
while True:
line = yield
target.send(next(reader))
@consumer
def create_ticker(target):
while True:
row = yield
target.send(Ticker.from_row(row))
@consumer
def negchange(target):
while True:
record = yield
if record.change < 0:
target.send(record)
@consumer
def ticker(fmt, fields):
formatter = create_formatter(fmt)
formatter.headings(fields)
while True:
rec = yield
row = [getattr(rec, name) for name in fields]
formatter.row(row)
if __name__ == '__main__':
follow('../../Data/stocklog.csv',
to_csv(
create_ticker(
negchange(
ticker('text', ['name','price','change'])))))

View File

@@ -0,0 +1,91 @@
# structure.py
from validate import Validator, validated
from collections import ChainMap
class StructureMeta(type):
@classmethod
def __prepare__(meta, clsname, bases):
return ChainMap({}, Validator.validators)
@staticmethod
def __new__(meta, name, bases, methods):
methods = methods.maps[0]
return super().__new__(meta, name, bases, methods)
class Structure(metaclass=StructureMeta):
_fields = ()
_types = ()
def __setattr__(self, name, value):
if name.startswith('_') or name in self._fields:
super().__setattr__(name, value)
else:
raise AttributeError('No attribute %s' % name)
def __repr__(self):
return '%s(%s)' % (type(self).__name__,
', '.join(repr(getattr(self, name)) for name in self._fields))
def __iter__(self):
for name in self._fields:
yield getattr(self, name)
def __eq__(self, other):
return isinstance(other, type(self)) and tuple(self) == tuple(other)
@classmethod
def from_row(cls, row):
rowdata = [ func(val) for func, val in zip(cls._types, row) ]
return cls(*rowdata)
@classmethod
def create_init(cls):
'''
Create an __init__ method from _fields
'''
args = ','.join(cls._fields)
code = f'def __init__(self, {args}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = { }
exec(code, locs)
cls.__init__ = locs['__init__']
@classmethod
def __init_subclass__(cls):
# Apply the validated decorator to subclasses
validate_attributes(cls)
def validate_attributes(cls):
'''
Class decorator that scans a class definition for Validators
and builds a _fields variable that captures their definition order.
'''
validators = []
for name, val in vars(cls).items():
if isinstance(val, Validator):
validators.append(val)
# Apply validated decorator to any callable with annotations
elif callable(val) and val.__annotations__:
setattr(cls, name, validated(val))
# Collect all of the field names
cls._fields = tuple([v.name for v in validators])
# Collect type conversions. The lambda x:x is an identity
# function that's used in case no expected_type is found.
cls._types = tuple([ getattr(v, 'expected_type', lambda x: x)
for v in validators ])
# Create the __init__ method
if cls._fields:
cls.create_init()
return cls
def typed_structure(clsname, **validators):
cls = type(clsname, (Structure,), validators)
return cls

View File

@@ -0,0 +1,82 @@
# tableformat.py
from abc import ABC, abstractmethod
def print_table(records, fields, formatter):
if not isinstance(formatter, TableFormatter):
raise RuntimeError('Expected a TableFormatter')
formatter.headings(fields)
for r in records:
rowdata = [getattr(r, fieldname) for fieldname in fields]
formatter.row(rowdata)
class TableFormatter(ABC):
@abstractmethod
def headings(self, headers):
pass
@abstractmethod
def row(self, rowdata):
pass
class TextTableFormatter(TableFormatter):
def headings(self, headers):
print(' '.join('%10s' % h for h in headers))
print(('-'*10 + ' ')*len(headers))
def row(self, rowdata):
print(' '.join('%10s' % d for d in rowdata))
class CSVTableFormatter(TableFormatter):
def headings(self, headers):
print(','.join(headers))
def row(self, rowdata):
print(','.join(str(d) for d in rowdata))
class HTMLTableFormatter(TableFormatter):
def headings(self, headers):
print('<tr>', end=' ')
for h in headers:
print('<th>%s</th>' % h, end=' ')
print('</tr>')
def row(self, rowdata):
print('<tr>', end=' ')
for d in rowdata:
print('<td>%s</td>' % d, end=' ')
print('</tr>')
class ColumnFormatMixin:
formats = []
def row(self, rowdata):
rowdata = [ (fmt % item) for fmt, item in zip(self.formats, rowdata)]
super().row(rowdata)
class UpperHeadersMixin:
def headings(self, headers):
super().headings([h.upper() for h in headers])
def create_formatter(name, column_formats=None, upper_headers=False):
if name == 'text':
formatter_cls = TextTableFormatter
elif name == 'csv':
formatter_cls = CSVTableFormatter
elif name == 'html':
formatter_cls = HTMLTableFormatter
else:
raise RuntimeError('Unknown format %s' % name)
if column_formats:
class formatter_cls(ColumnFormatMixin, formatter_cls):
formats = column_formats
if upper_headers:
class formatter_cls(UpperHeadersMixin, formatter_cls):
pass
return formatter_cls()

173
Solutions/8_3/validate.py Normal file
View File

@@ -0,0 +1,173 @@
# validate.py
class Validator:
def __init__(self, name=None):
self.name = name
def __set_name__(self, cls, name):
self.name = name
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
# Collect all derived classes into a dict
validators = { }
@classmethod
def __init_subclass__(cls):
cls.validators[cls.__name__] = cls
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f'expected {cls.expected_type}')
return super().check(value)
_typed_classes = [
('Integer', int),
('Float', float),
('String', str) ]
globals().update((name, type(name, (Typed,), {'expected_type':ty}))
for name, ty in _typed_classes)
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError('must be >= 0')
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
raise ValueError('must be non-empty')
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
from inspect import signature
from functools import wraps
def isvalidator(item):
return isinstance(item, type) and issubclass(item, Validator)
def validated(func):
sig = signature(func)
# Gather the function annotations
annotations = { name:val for name, val in func.__annotations__.items()
if isvalidator(val) }
# Get the return annotation (if any)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
# Enforce return check (if any)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**annotations):
retcheck = annotations.pop('return_', None)
def decorate(func):
sig = signature(func)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
# Enforce argument checks
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return decorate
# Examples
if __name__ == '__main__':
@validated
def add(x:Integer, y:Integer) -> Integer:
return x + y
@validated
def div(x:Integer, y:Integer) -> Integer:
return x / y
@enforce(x=Integer, y=Integer)
def sub(x, y):
return x - y
class Stock:
name = NonEmptyString()
shares = PositiveInteger()
price = PositiveFloat()
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def __repr__(self):
return f'Stock({self.name!r}, {self.shares!r}, {self.price!r})'
@property
def cost(self):
return self.shares * self.price
@validated
def sell(self, nshares:PositiveInteger):
self.shares -= nshares

40
Solutions/8_4/cofollow.py Normal file
View File

@@ -0,0 +1,40 @@
# cofollow.py
import os
import time
import csv
def follow(filename,target):
with open(filename,"r") as f:
f.seek(0,os.SEEK_END)
while True:
line = f.readline()
if line != '':
target.send(line)
else:
time.sleep(0.1)
# Decorator for coroutines
from functools import wraps
def consumer(func):
@wraps(func)
def start(*args,**kwargs):
f = func(*args,**kwargs)
f.send(None)
return f
return start
# Sample coroutine
@consumer
def printer():
while True:
try:
item = yield
print(item)
except Exception as e:
print('ERROR: %r' % e)
# Example use.
if __name__ == '__main__':
follow('../../Data/stocklog.csv', printer())

56
Solutions/8_4/follow.py Normal file
View File

@@ -0,0 +1,56 @@
# follow.py
import os
import time
def follow(filename):
'''
Generator that produces a sequence of lines being written at the end of a file.
'''
try:
with open(filename,'r') as f:
f.seek(0,os.SEEK_END)
while True:
line = f.readline()
if line == '':
time.sleep(0.1) # Sleep briefly to avoid busy wait
continue
yield line
except GeneratorExit:
print('Following Done')
def splitter(lines):
for line in lines:
yield line.split(',')
def make_records(rows,names):
for row in rows:
yield dict(zip(names,row))
def unquote(records,keylist):
for r in records:
for key in keylist:
r[key] = r[key].strip('"')
yield r
def convert(records,converter,keylist):
for r in records:
for key in keylist:
r[key] = converter(r[key])
yield r
def parse_stock_data(lines):
rows = splitter(lines)
records = make_records(rows,['name','price','date','time',
'change','open','high','low','volume'])
records = unquote(records,["name","date","time"])
records = convert(records,float,['price','change','open','high','low'])
records = convert(records,int,['volume'])
return records
# Sample use
if __name__ == '__main__':
lines = follow("../../Data/stocklog.dat")
records = parse_stock_data(lines)
for r in records:
print("%(name)10s %(price)10.2f %(change)10.2f" % r)

View File

@@ -0,0 +1,33 @@
# multitask.py
from collections import deque
tasks = deque()
def run():
while tasks:
task = tasks.popleft()
try:
next(task)
tasks.append(task)
except StopIteration:
print('Task done')
def countdown(n):
while n > 0:
print('T-minus', n)
yield
n -= 1
def countup(n):
x = 0
while x < n:
print('Up we go', x)
yield
x += 1
if __name__ == '__main__':
tasks.append(countdown(10))
tasks.append(countdown(5))
tasks.append(countup(20))
run()

55
Solutions/8_5/server.py Normal file
View File

@@ -0,0 +1,55 @@
# server.py
from socket import *
from select import select
from collections import deque
tasks = deque()
recv_wait = {} # sock -> task
send_wait = {} # sock -> task
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
reason, resource = task.send(None)
if reason == 'recv':
recv_wait[resource] = task
elif reason == 'send':
send_wait[resource] = task
else:
raise RuntimeError('Unknown reason %r' % reason)
except StopIteration:
print('Task done')
def tcp_server(address, handler):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield 'recv', sock
client, addr = sock.accept()
tasks.append(handler(client, addr))
def echo_handler(client, address):
print('Connection from', address)
while True:
yield 'recv', client
data = client.recv(1000)
if not data:
break
yield 'send', client
client.send(b'GOT:' + data)
print('Connection closed')
if __name__ == '__main__':
tasks.append(tcp_server(('',25000), echo_handler))
run()

Some files were not shown because too many files have changed in this diff Show More