Source code for warcat.model.field

'''Named fields'''
# Copyright 2013 Christopher Foo <chris.foo@gmail.com>
# Licensed under GPLv3. See COPYING.txt for details.
from warcat.model.binary import StrSerializable, BytesSerializable
from warcat.model.common import NEWLINE
import collections
import logging
import re


_logger = logging.getLogger(__name__)


[docs]class Fields(StrSerializable, BytesSerializable): '''Name and value pseudo-map list Behaves like a `dict` or mutable mapping. Mutable mapping operations remove any duplicates in the field list. ''' def __init__(self, field_list=None): self._list = [] if field_list is None else field_list def __contains__(self, name): return self.get(name) is not None def __iter__(self): return self._list def __len__(self): return len(self._list) def __getitem__(self, name): for k, v in self._list: if k.lower() == name.lower(): return v raise KeyError('{} not in fields'.format(name)) def __setitem__(self, name, value): try: index = self.index(name) except KeyError: self._list.append((name, value)) else: del self[name] self._list.insert(index, (name, value)) def __delitem__(self, name): self._list[:] = [x for x in self._list if x[0].lower() != name.lower()]
[docs] def add(self, name, value): '''Append a name-value field to the list''' self._list.append((name, value))
[docs] def get(self, name, default=None): try: return self[name] except KeyError: return default
[docs] def get_list(self, name): '''Return a list of values''' return list([x for x in self._list if x[0].lower() == name.lower()])
[docs] def count(self, name): '''Count the number of times this name occurs in the list''' return len(self.get_list(name))
[docs] def index(self, name): '''Return the index of the first occurance of given name''' for i in range(len(self._list)): if self._list[i][0].lower() == name.lower(): return i raise KeyError('Name {} not found in fields'.format(name))
[docs] def list(self): '''Return the underlying list''' return self._list
[docs] def keys(self): return [x[0] for x in self._list]
[docs] def values(self): return [x[1] for x in self._list]
[docs] def clear(self): self._list[:] = []
[docs] def iter_str(self): for name, value in self._list: if value: yield '{}: {}'.format(name, value) else: yield '{}:'.format(name) yield NEWLINE
[docs] def iter_bytes(self): for s in self.iter_str(): yield s.encode()
@classmethod
[docs] def parse(cls, s, newline=NEWLINE): '''Parse a named field string and return a :class:`Fields`''' fields = Fields() lines = collections.deque( s.split(newline) if isinstance(newline, str) else re.split(newline, s) ) while lines: line = lines.popleft() if not line: continue name, value = line.split(':', 1) value = value.lstrip() value = cls.join_multilines(value, lines) fields.add(name, value) return fields
@classmethod
[docs] def join_multilines(cls, value, lines): '''Scan for multiline value which is prefixed with a space or tab''' while lines: line = lines.popleft() if not line: break if line[0] not in (' ', '\t'): lines.appendleft(line) break value = '{}{}'.format(value, line[1:]) return value
[docs]class HTTPHeader(Fields): '''Fields extended with a HTTP status attribute. .. attribute:: status The `str` of the HTTP status message and code. ''' def __init__(self, field_list=None, status=None): Fields.__init__(self, field_list=field_list) self.status = status @property def status_code(self): return int(self.status.split()[1]) @classmethod
[docs] def parse(cls, s, newline=NEWLINE): http_headers = HTTPHeader() http_headers.status, s = s.split(newline, 1) fields = super(HTTPHeader, cls).parse(s, newline=newline) http_headers.list().extend(fields.list()) return http_headers
[docs] def iter_str(self): yield self.status yield NEWLINE for s in Fields.iter_str(self): yield s
HTTPHeaders = HTTPHeader '''.. deprecated:: 2.1.1 Name uses wrong inflection. Use :class:`HTTPHeader` instead. ''' __all__ = ['Fields', 'HTTPHeader', 'HTTPHeaders', 'Header']