update from Atlas

This commit is contained in:
Luciano Ramalho 2014-12-29 03:51:34 -02:00
parent 9db73c75ef
commit 08b7bce340
12 changed files with 801 additions and 21 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
concurrency/flags/img/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

View File

@ -0,0 +1,43 @@
import timeit
def exists_and_truthy_hasattr(obj, attr_name):
if hasattr(obj, attr_name):
return bool(getattr(obj, attr_name))
else:
return False
def exists_and_truthy_getattr(obj, attr_name):
return bool(getattr(obj, attr_name, False))
def exists_and_truthy_tryget(obj, attr_name):
try:
return bool(getattr(obj, attr_name))
except AttributeError:
return False
class Gizmo:
def __init__(self):
self.gadget = True
gizmo = Gizmo()
test_keys = 'hasattr', 'getattr', 'tryget'
def average(timings):
sample = timings[1:-1]
return sum(sample) / len(sample)
def do_tests():
for test_key in test_keys:
func_name = 'exists_and_truthy_' + test_key
test = func_name + '(gizmo, "gadget")'
setup = 'from __main__ import gizmo, ' + func_name
elapsed = average(timeit.repeat(test, repeat=5, setup=setup))
print(test_key.rjust(7), format(elapsed, '0.5f'))
if __name__ == '__main__':
do_tests()
del gizmo.gadget
do_tests()

44
attributes/hasattr.py Normal file
View File

@ -0,0 +1,44 @@
import timeit
test_hasattr = """
if hasattr(gizmo, 'gadget'):
feature = gizmo.gadget
else:
feature = None
"""
test_getattr = """
feature = getattr(gizmo, 'gadget', None)
"""
test_tryget = """
try:
feature = getattr(gizmo, 'gadget')
except AttributeError:
feature = None
"""
class Gizmo:
def __init__(self):
self.gadget = True
gizmo = Gizmo()
test_keys = 'hasattr', 'getattr', 'tryget'
def test():
for test_key in test_keys:
test_name = 'test_' + test_key
test = globals()[test_name]
setup = 'from __main__ import gizmo'
t_present = min(timeit.repeat(test, setup=setup))
del gizmo.gadget
t_absent = min(timeit.repeat(test, setup=setup))
gizmo.gadget = True
print('{:7} {:.3f} {:.3f}'.format(test_key, t_present, t_absent))
if __name__ == '__main__':
test()

View File

@ -16,7 +16,11 @@ NGINX_URL = 'http://localhost:8080/ciaflags/{gec}.gif'
# Vaurien
VAURIEN_URL = 'http://localhost:8000/ciaflags/{gec}.gif'
BASE_URL = VAURIEN_URL
SOURCE_URLS = {
'CIA' : CIA_URL,
'NGINX' : NGINX_URL,
'VAURIEN' : VAURIEN_URL,
}
DEST_PATH_NAME = 'img/{cc}.gif'
@ -34,8 +38,9 @@ def _load():
cc2gec[iso_cc] = gec
def flag_url(iso_cc):
return BASE_URL.format(gec=cc2gec[iso_cc].lower())
def flag_url(iso_cc, source='CIA'):
base_url = SOURCE_URLS[source.upper()]
return base_url.format(gec=cc2gec[iso_cc].lower())
def iso_file_name(iso_cc):
return DEST_PATH_NAME.format(cc=iso_cc.lower())

View File

@ -5,8 +5,8 @@ import time
times = {}
def fetch(iso_cc):
resp = requests.get(cf.flag_url(iso_cc))
def fetch(iso_cc, source):
resp = requests.get(cf.flag_url(iso_cc, source))
if resp.status_code != 200:
resp.raise_for_status()
file_name = cf.iso_file_name(iso_cc)
@ -14,7 +14,7 @@ def fetch(iso_cc):
written = img.write(resp.content)
return written, file_name
def main():
def main(source):
pending = sorted(cf.cc2name)
to_download = len(pending)
downloaded = 0
@ -23,7 +23,7 @@ def main():
print('get:', iso_cc)
try:
times[iso_cc] = [time.time() - t0]
octets, file_name = fetch(iso_cc)
octets, file_name = fetch(iso_cc, source)
times[iso_cc].append(time.time() - t0)
downloaded += 1
print('\t--> {}: {:5d} bytes'.format(file_name, octets))
@ -36,7 +36,14 @@ def main():
print('{}\t{:.6g}\t{:.6g}'.format(iso_cc, start, end))
if __name__ == '__main__':
main()
import argparse
source_names = ', '.join(sorted(cf.SOURCE_URLS))
parser = argparse.ArgumentParser(description='Download flag images.')
parser.add_argument('source', help='one of: ' + source_names)
args = parser.parse_args()
main(args.source)
"""
From cia.gov:
@ -53,4 +60,4 @@ From localhost nginx via Vaurien with .5s delay
real 1m40.519s
user 0m1.103s
sys 0m0.243s
"""
"""

View File

@ -11,7 +11,7 @@ GLOBAL_TIMEOUT = 300 # seconds
times = {}
def main(num_threads):
def main(source, num_threads):
pool = futures.ThreadPoolExecutor(num_threads)
pending = {}
t0 = time.time()
@ -19,7 +19,7 @@ def main(num_threads):
for iso_cc in sorted(cf.cc2name):
print('get:', iso_cc)
times[iso_cc] = [time.time() - t0]
job = pool.submit(fetch, iso_cc)
job = pool.submit(fetch, iso_cc, source)
pending[job] = iso_cc
to_download = len(pending)
downloaded = 0
@ -39,18 +39,23 @@ def main(num_threads):
print('{}\t{:.6g}\t{:.6g}'.format(iso_cc, start, end))
if __name__ == '__main__':
if len(sys.argv) == 2:
num_threads = int(sys.argv[1])
else:
num_threads = DEFAULT_NUM_THREADS
main(num_threads)
import argparse
source_names = ', '.join(sorted(cf.SOURCE_URLS))
parser = argparse.ArgumentParser(description='Download flag images.')
parser.add_argument('source', help='one of: ' + source_names)
parser.add_argument('-t', '--threads', type=int, default=DEFAULT_NUM_THREADS,
help='number of threads (default: %s)' % DEFAULT_NUM_THREADS)
args = parser.parse_args()
main(args.source, args.threads)
"""
From localhost nginx:
real 0m1.163s
user 0m1.001s
sys 0m0.289s
From CIA, 1 thread:
real 2m0.832s
user 0m4.685s
sys 0m0.366s
"""
"""

BIN
concurrency/flags/img.zip Normal file

Binary file not shown.

50
metaprog/spreadsheet.py Normal file
View File

@ -0,0 +1,50 @@
"""
Spreadsheet example adapted from Raymond Hettinger's `recipe`__
__ http://code.activestate.com/recipes/355045-spreadsheet/
Demonstration::
>>> from math import sin, pi
>>> ss = Spreadsheet(sin=sin, pi=pi, abs=abs)
>>> ss['a1'] = '-5'
>>> ss['a2'] = 'a1*6'
>>> ss['a3'] = 'a2*7'
>>> ss['a3']
-210
>>> ss['b1'] = 'sin(pi/4)'
>>> ss['b1'] # doctest:+ELLIPSIS
0.707106781186...
>>> ss.getformula('b1')
'sin(pi/4)'
>>> ss['c1'] = 'abs(a2)'
>>> ss['c1']
30
>>> ss['c2'] = 'len(a2)'
>>> ss['c2']
Traceback (most recent call last):
...
NameError: name 'len' is not defined
>>> ss['d1'] = '3*'
>>> ss['d1']
Traceback (most recent call last):
...
SyntaxError: unexpected EOF while parsing
"""
class Spreadsheet:
def __init__(self, **tools):
self._cells = {}
self._tools = {'__builtins__' : {}}
self._tools.update(tools)
def __setitem__(self, key, formula):
self._cells[key] = formula
def getformula(self, key):
return self._cells[key]
def __getitem__(self, key):
return eval(self._cells[key], self._tools, self)

54
metaprog/spreadsheet2.py Normal file
View File

@ -0,0 +1,54 @@
"""
Spreadsheet example adapted from Raymond Hettinger's `recipe`__
__ http://code.activestate.com/recipes/355045-spreadsheet/
Demonstration::
>>> from math import sin, pi
>>> ss = Spreadsheet(sin=sin, pi=pi, abs=abs)
>>> ss['a1'] = '-5'
>>> ss['a2'] = 'a1*6'
>>> ss['a3'] = 'a2*7'
>>> ss['a3']
-210
>>> ss['b1'] = 'sin(pi/4)'
>>> ss['b1'] # doctest:+ELLIPSIS
0.707106781186...
>>> ss.getformula('b1')
'sin(pi/4)'
>>> ss['c1'] = 'abs(a2)'
>>> ss['c1']
30
>>> ss['c2'] = 'len(a2)'
>>> ss['c2']
Traceback (most recent call last):
...
NameError: name 'len' is not defined
>>> ss['d1'] = '3*'
Traceback (most recent call last):
...
SyntaxError: unexpected EOF while parsing ['d1'] = '3*'
"""
class Spreadsheet:
def __init__(self, **tools):
self._cells = {}
self._tools = {'__builtins__' : {}}
self._tools.update(tools)
def __setitem__(self, key, formula):
try:
compile(formula, '<__setitem__>', 'eval')
except SyntaxError as exc:
msg = '{} [{!r}] = {!r}'.format(exc.msg, key, formula)
raise SyntaxError(msg)
self._cells[key] = formula
def getformula(self, key):
return self._cells[key]
def __getitem__(self, key):
return eval(self._cells[key], self._tools, self)

261
support/isis2json/isis2json.py Executable file
View File

@ -0,0 +1,261 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# isis2json.py: convert ISIS and ISO-2709 files to JSON
#
# Copyright (C) 2010 BIREME/PAHO/WHO
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
############################
# BEGIN ISIS2JSON
# this script works with Python or Jython (versions >=2.5 and <3)
import sys
import argparse
from uuid import uuid4
import os
try:
import json
except ImportError:
if os.name == 'java': # running Jython
from com.xhaus.jyson import JysonCodec as json
else:
import simplejson as json
SKIP_INACTIVE = True
DEFAULT_QTY = 2**31
ISIS_MFN_KEY = 'mfn'
ISIS_ACTIVE_KEY = 'active'
SUBFIELD_DELIMITER = '^'
INPUT_ENCODING = 'cp1252'
def iter_iso_records(iso_file_name, isis_json_type): # <1>
from iso2709 import IsoFile
from subfield import expand
iso = IsoFile(iso_file_name)
for record in iso:
fields = {}
for field in record.directory:
field_key = str(int(field.tag)) # remove leading zeroes
field_occurrences = fields.setdefault(field_key, [])
content = field.value.decode(INPUT_ENCODING, 'replace')
if isis_json_type == 1:
field_occurrences.append(content)
elif isis_json_type == 2:
field_occurrences.append(expand(content))
elif isis_json_type == 3:
field_occurrences.append(dict(expand(content)))
else:
raise NotImplementedError('ISIS-JSON type %s conversion '
'not yet implemented for .iso input' % isis_json_type)
yield fields
iso.close()
def iter_mst_records(master_file_name, isis_json_type): # <2>
try:
from bruma.master import MasterFactory, Record
except ImportError:
print('IMPORT ERROR: Jython 2.5 and Bruma.jar '
'are required to read .mst files')
raise SystemExit
mst = MasterFactory.getInstance(master_file_name).open()
for record in mst:
fields = {}
if SKIP_INACTIVE:
if record.getStatus() != Record.Status.ACTIVE:
continue
else: # save status only there are non-active records
fields[ISIS_ACTIVE_KEY] = (record.getStatus() ==
Record.Status.ACTIVE)
fields[ISIS_MFN_KEY] = record.getMfn()
for field in record.getFields():
field_key = str(field.getId())
field_occurrences = fields.setdefault(field_key, [])
if isis_json_type == 3:
content = {}
for subfield in field.getSubfields():
subfield_key = subfield.getId()
if subfield_key == '*':
content['_'] = subfield.getContent()
else:
subfield_occurrences = content.setdefault(subfield_key, [])
subfield_occurrences.append(subfield.getContent())
field_occurrences.append(content)
elif isis_json_type == 1:
content = []
for subfield in field.getSubfields():
subfield_key = subfield.getId()
if subfield_key == '*':
content.insert(0, subfield.getContent())
else:
content.append(SUBFIELD_DELIMITER + subfield_key +
subfield.getContent())
field_occurrences.append(''.join(content))
else:
raise NotImplementedError('ISIS-JSON type %s conversion '
'not yet implemented for .mst input' % isis_json_type)
yield fields
mst.close()
def write_json(input_gen, file_name, output, qty, skip, id_tag, # <3>
gen_uuid, mongo, mfn, isis_json_type, prefix,
constant):
start = skip
end = start + qty
if id_tag:
id_tag = str(id_tag)
ids = set()
else:
id_tag = ''
for i, record in enumerate(input_gen):
if i >= end:
break
if not mongo:
if i == 0:
output.write('[')
elif i > start:
output.write(',')
if start <= i < end:
if id_tag:
occurrences = record.get(id_tag, None)
if occurrences is None:
msg = 'id tag #%s not found in record %s'
if ISIS_MFN_KEY in record:
msg = msg + (' (mfn=%s)' % record[ISIS_MFN_KEY])
raise KeyError(msg % (id_tag, i))
if len(occurrences) > 1:
msg = 'multiple id tags #%s found in record %s'
if ISIS_MFN_KEY in record:
msg = msg + (' (mfn=%s)' % record[ISIS_MFN_KEY])
raise TypeError(msg % (id_tag, i))
else: # ok, we have one and only one id field
if isis_json_type == 1:
id = occurrences[0]
elif isis_json_type == 2:
id = occurrences[0][0][1]
elif isis_json_type == 3:
id = occurrences[0]['_']
if id in ids:
msg = 'duplicate id %s in tag #%s, record %s'
if ISIS_MFN_KEY in record:
msg = msg + (' (mfn=%s)' % record[ISIS_MFN_KEY])
raise TypeError(msg % (id, id_tag, i))
record['_id'] = id
ids.add(id)
elif gen_uuid:
record['_id'] = unicode(uuid4())
elif mfn:
record['_id'] = record[ISIS_MFN_KEY]
if prefix:
# iterate over a fixed sequence of tags
for tag in tuple(record):
if str(tag).isdigit():
record[prefix+tag] = record[tag]
del record[tag] # this is why we iterate over a tuple
# with the tags, and not directly on the record dict
if constant:
constant_key, constant_value = constant.split(':')
record[constant_key] = constant_value
output.write(json.dumps(record).encode('utf-8'))
output.write('\n')
if not mongo:
output.write(']\n')
def main(): # <4>
# create the parser
parser = argparse.ArgumentParser(
description='Convert an ISIS .mst or .iso file to a JSON array')
# add the arguments
parser.add_argument(
'file_name', metavar='INPUT.(mst|iso)',
help='.mst or .iso file to read')
parser.add_argument(
'-o', '--out', type=argparse.FileType('w'), default=sys.stdout,
metavar='OUTPUT.json',
help='the file where the JSON output should be written'
' (default: write to stdout)')
parser.add_argument(
'-c', '--couch', action='store_true',
help='output array within a "docs" item in a JSON document'
' for bulk insert to CouchDB via POST to db/_bulk_docs')
parser.add_argument(
'-m', '--mongo', action='store_true',
help='output individual records as separate JSON dictionaries,'
' one per line for bulk insert to MongoDB via mongoimport utility')
parser.add_argument(
'-t', '--type', type=int, metavar='ISIS_JSON_TYPE', default=1,
help='ISIS-JSON type, sets field structure: 1=string, 2=alist, 3=dict (default=1)')
parser.add_argument(
'-q', '--qty', type=int, default=DEFAULT_QTY,
help='maximum quantity of records to read (default=ALL)')
parser.add_argument(
'-s', '--skip', type=int, default=0,
help='records to skip from start of .mst (default=0)')
parser.add_argument(
'-i', '--id', type=int, metavar='TAG_NUMBER', default=0,
help='generate an "_id" from the given unique TAG field number'
' for each record')
parser.add_argument(
'-u', '--uuid', action='store_true',
help='generate an "_id" with a random UUID for each record')
parser.add_argument(
'-p', '--prefix', type=str, metavar='PREFIX', default='',
help='concatenate prefix to every numeric field tag (ex. 99 becomes "v99")')
parser.add_argument(
'-n', '--mfn', action='store_true',
help='generate an "_id" from the MFN of each record'
' (available only for .mst input)')
parser.add_argument(
'-k', '--constant', type=str, metavar='TAG:VALUE', default='',
help='Include a constant tag:value in every record (ex. -k type:AS)')
'''
# TODO: implement this to export large quantities of records to CouchDB
parser.add_argument(
'-r', '--repeat', type=int, default=1,
help='repeat operation, saving multiple JSON files'
' (default=1, use -r 0 to repeat until end of input)')
'''
# parse the command line
args = parser.parse_args()
if args.file_name.lower().endswith('.mst'):
input_gen_func = iter_mst_records # <5>
else:
if args.mfn:
print('UNSUPORTED: -n/--mfn option only available for .mst input.')
raise SystemExit
input_gen_func = iter_iso_records # <6>
input_gen = input_gen_func(args.file_name, args.type) # <7>
if args.couch:
args.out.write('{ "docs" : ')
write_json(input_gen, args.file_name, args.out, args.qty, # <8>
args.skip, args.id, args.uuid, args.mongo, args.mfn,
args.type, args.prefix, args.constant)
if args.couch:
args.out.write('}\n')
args.out.close()
if __name__ == '__main__':
main()
# END ISIS2JSON

View File

@ -0,0 +1,167 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# ISO-2709 file reader
#
# Copyright (C) 2010 BIREME/PAHO/WHO
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from struct import unpack
CR = '\x0D' # \r
LF = '\x0A' # \n
IS1 = '\x1F' # ECMA-48 Unit Separator
IS2 = '\x1E' # ECMA-48 Record Separator / ISO-2709 field separator
IS3 = '\x1D' # ECMA-48 Group Separator / ISO-2709 record separator
LABEL_LEN = 24
LABEL_FORMAT = '5s c 4s c c 5s 3s c c c c'
TAG_LEN = 3
DEFAULT_ENCODING = 'ASCII'
SUBFIELD_DELIMITER = '^'
class IsoFile(object):
def __init__(self, filename, encoding = DEFAULT_ENCODING):
self.file = open(filename, 'rb')
self.encoding = encoding
def __iter__(self):
return self
def next(self):
return IsoRecord(self)
__next__ = next # Python 3 compatibility
def read(self, size):
''' read and drop all CR and LF characters '''
# TODO: this is inneficient but works, patches accepted!
# NOTE: our fixtures include files which have no linebreaks,
# files with CR-LF linebreaks and files with LF linebreaks
chunks = []
count = 0
while count < size:
chunk = self.file.read(size-count)
if len(chunk) == 0:
break
chunk = chunk.replace(CR+LF,'')
if CR in chunk:
chunk = chunk.replace(CR,'')
if LF in chunk:
chunk = chunk.replace(LF,'')
count += len(chunk)
chunks.append(chunk)
return ''.join(chunks)
def close(self):
self.file.close()
class IsoRecord(object):
label_part_names = ('rec_len rec_status impl_codes indicator_len identifier_len'
' base_addr user_defined'
# directory map:
' fld_len_len start_len impl_len reserved').split()
rec_len = 0
def __init__(self, iso_file=None):
self.iso_file = iso_file
self.load_label()
self.load_directory()
self.load_fields()
def __len__(self):
return self.rec_len
def load_label(self):
label = self.iso_file.read(LABEL_LEN)
if len(label) == 0:
raise StopIteration
elif len(label) != 24:
raise ValueError('Invalid record label: "%s"' % label)
parts = unpack(LABEL_FORMAT, label)
for name, part in zip(self.label_part_names, parts):
if name.endswith('_len') or name.endswith('_addr'):
part = int(part)
setattr(self, name, part)
def show_label(self):
for name in self.label_part_names:
print('%15s : %r' % (name, getattr(self, name)))
def load_directory(self):
fmt_dir = '3s %ss %ss %ss' % (self.fld_len_len, self.start_len, self.impl_len)
entry_len = TAG_LEN + self.fld_len_len + self.start_len + self.impl_len
self.directory = []
while True:
char = self.iso_file.read(1)
if char.isdigit():
entry = char + self.iso_file.read(entry_len-1)
entry = Field(* unpack(fmt_dir, entry))
self.directory.append(entry)
else:
break
def load_fields(self):
for field in self.directory:
if self.indicator_len > 0:
field.indicator = self.iso_file.read(self.indicator_len)
# XXX: lilacs30.iso has an identifier_len == 2,
# but we need to ignore it to succesfully read the field contents
# TODO: find out when to ignore the idenfier_len,
# or fix the lilacs30.iso fixture
#
##if self.identifier_len > 0: #
## field.identifier = self.iso_file.read(self.identifier_len)
value = self.iso_file.read(len(field))
assert len(value) == len(field)
field.value = value[:-1] # remove trailing field separator
self.iso_file.read(1) # discard record separator
def __iter__(self):
return self
def next(self):
for field in self.directory:
yield(field)
__next__ = next # Python 3 compatibility
def dump(self):
for field in self.directory:
print('%3s %r' % (field.tag, field.value))
class Field(object):
def __init__(self, tag, len, start, impl):
self.tag = tag
self.len = int(len)
self.start = int(start)
self.impl = impl
def show(self):
for name in 'tag len start impl'.split():
print('%15s : %r' % (name, getattr(self, name)))
def __len__(self):
return self.len
def test():
import doctest
doctest.testfile('iso2709_test.txt')
if __name__=='__main__':
test()

View File

@ -0,0 +1,142 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# ISIS-DM: the ISIS Data Model API
#
# Copyright (C) 2010 BIREME/PAHO/WHO
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import namedtuple
import re
MAIN_SUBFIELD_KEY = '_'
SUBFIELD_MARKER_RE = re.compile(r'\^([a-z0-9])', re.IGNORECASE)
DEFAULT_ENCODING = u'utf-8'
def expand(content, subkeys=None):
''' Parse a field into an association list of keys and subfields
>>> expand('zero^1one^2two^3three')
[('_', 'zero'), ('1', 'one'), ('2', 'two'), ('3', 'three')]
'''
if subkeys is None:
regex = SUBFIELD_MARKER_RE
elif subkeys == '':
return [(MAIN_SUBFIELD_KEY, content)]
else:
regex = re.compile(r'\^(['+subkeys+'])', re.IGNORECASE)
content = content.replace('^^', '^^ ')
parts = []
start = 0
key = MAIN_SUBFIELD_KEY
while True:
found = regex.search(content, start)
if found is None: break
parts.append((key, content[start:found.start()].rstrip()))
key = found.group(1).lower()
start = found.end()
parts.append((key, content[start:].rstrip()))
return parts
class CompositeString(object):
''' Represent an Isis field, with subfields, using
Python native datastructures
>>> author = CompositeString('John Tenniel^xillustrator',
... subkeys='x')
>>> unicode(author)
u'John Tenniel^xillustrator'
'''
def __init__(self, isis_raw, subkeys=None, encoding=DEFAULT_ENCODING):
if not isinstance(isis_raw, basestring):
raise TypeError('%r value must be unicode or str instance' % isis_raw)
self.__isis_raw = isis_raw.decode(encoding)
self.__expanded = expand(self.__isis_raw, subkeys)
def __getitem__(self, key):
for subfield in self.__expanded:
if subfield[0] == key:
return subfield[1]
else:
raise KeyError(key)
def __iter__(self):
return (subfield[0] for subfield in self.__expanded)
def items(self):
return self.__expanded
def __unicode__(self):
return self.__isis_raw
def __str__(self):
return str(self.__isis_raw)
class CompositeField(object):
''' Represent an Isis field, with subfields, using
Python native datastructures
>>> author = CompositeField( [('name','Braz, Marcelo'),('role','writer')] )
>>> print author['name']
Braz, Marcelo
>>> print author['role']
writer
>>> author
CompositeField((('name', 'Braz, Marcelo'), ('role', 'writer')))
'''
def __init__(self, value, subkeys=None):
if subkeys is None:
subkeys = [item[0] for item in value]
try:
value_as_dict = dict(value)
except TypeError:
raise TypeError('%r value must be a key-value structure' % self)
for key in value_as_dict:
if key not in subkeys:
raise TypeError('Unexpected keyword %r' % key)
self.value = tuple([(key, value_as_dict.get(key,None)) for key in subkeys])
def __getitem__(self, key):
return dict(self.value)[key]
def __repr__(self):
return "CompositeField(%s)" % str(self.items())
def items(self):
return self.value
def __unicode__(self):
unicode(self.items())
def __str__(self):
str(self.items())
def test():
import doctest
doctest.testmod()
if __name__=='__main__':
test()