'''Verification helpers'''
# Copyright 2013 Christopher Foo <chris.foo@gmail.com>
# Licensed under GPLv3. See COPYING.txt for details.
import hashlib
import base64
from warcat import util, model
import binascii
ALGORITHM_MAP = {
'md5': hashlib.md5,
'sha1': hashlib.sha1,
'sha224': hashlib.sha224,
'sha256': hashlib.sha256,
'sha384': hashlib.sha384,
'sha512': hashlib.sha512,
}
[docs]def parse_digest_field(s):
'''Return the algorithm name and digest `bytes`'''
algorithm, digest = s.split(':', 1)
algorithm = algorithm.lower()
enc_digest = digest.encode()
try:
digest_bytes = base64.b64decode(enc_digest)
digest_bytes = base64.b32decode(enc_digest)
digest_bytes = base64.b16decode(enc_digest)
except binascii.Error as e:
if not digest_bytes:
raise e
return algorithm, digest_bytes
[docs]def verify_block_digest(record):
'''Return `True` if the content block hash digest is valid'''
value = record.header.fields['WARC-Block-Digest']
alg_name, given_digest = parse_digest_field(value)
hash_obj = ALGORITHM_MAP[alg_name]()
if isinstance(record.content_block, model.BlockWithPayload):
content_block = record.content_block.binary_block
else:
content_block = record.content_block
util.copyfile_obj(content_block.get_file(), hash_obj,
max_length=content_block.length, write_attr_name='update')
return given_digest == hash_obj.digest()
[docs]def verify_payload_digest(record):
'''Return `True` if the payload hash digest is valid'''
value = record.header.fields['WARC-Payload-Digest']
alg_name, given_digest = parse_digest_field(value)
hash_obj = ALGORITHM_MAP[alg_name]()
content_block = record.content_block.payload
util.copyfile_obj(content_block.get_file(), hash_obj,
max_length=content_block.length, write_attr_name='update')
return given_digest == hash_obj.digest()