spaCy/spacy/tests/serialize/test_huffman.py

112 lines
3.3 KiB
Python
Raw Normal View History

2015-07-13 13:58:07 +03:00
from __future__ import unicode_literals
from __future__ import division
import pytest
from spacy.serialize.huffman import HuffmanCodec
from spacy.serialize.bits import BitArray
2015-07-13 13:58:07 +03:00
import numpy
2015-07-19 18:59:51 +03:00
import math
2015-07-13 13:58:07 +03:00
from heapq import heappush, heappop, heapify
from collections import defaultdict
def py_encode(symb2freq):
"""Huffman encode the given dict mapping symbols to weights
From Rosetta Code
"""
heap = [[wt, [sym, ""]] for sym, wt in symb2freq.items()]
heapify(heap)
while len(heap) > 1:
lo = heappop(heap)
hi = heappop(heap)
for pair in lo[1:]:
pair[1] = '0' + pair[1]
for pair in hi[1:]:
pair[1] = '1' + pair[1]
heappush(heap, [lo[0] + hi[0]] + lo[1:] + hi[1:])
return dict(heappop(heap)[1:])
def test1():
probs = numpy.zeros(shape=(10,), dtype=numpy.float32)
probs[0] = 0.3
probs[1] = 0.2
probs[2] = 0.15
probs[3] = 0.1
probs[4] = 0.06
probs[5] = 0.02
probs[6] = 0.01
probs[7] = 0.005
probs[8] = 0.0001
probs[9] = 0.000001
2015-07-19 18:59:51 +03:00
codec = HuffmanCodec(list(enumerate(probs)))
2015-07-13 13:58:07 +03:00
py_codes = py_encode(dict(enumerate(probs)))
2015-07-24 04:47:59 +03:00
py_codes = list(py_codes.items())
2015-07-13 13:58:07 +03:00
py_codes.sort()
assert codec.strings == [c for i, c in py_codes]
def test_empty():
codec = HuffmanCodec({})
assert codec.strings == []
2015-07-13 13:58:07 +03:00
def test_round_trip():
freqs = {'the': 10, 'quick': 3, 'brown': 4, 'fox': 1, 'jumped': 5, 'over': 8,
'lazy': 1, 'dog': 2, '.': 9}
2015-07-19 18:59:51 +03:00
codec = HuffmanCodec(freqs.items())
2015-07-13 13:58:07 +03:00
message = ['the', 'quick', 'brown', 'fox', 'jumped', 'over', 'the',
'the', 'lazy', 'dog', '.']
2015-07-19 18:59:51 +03:00
strings = list(codec.strings)
2016-02-10 16:32:54 +03:00
codes = dict([(codec.leaves[i], strings[i]) for i in range(len(codec.leaves))])
2015-07-19 18:59:51 +03:00
bits = codec.encode(message)
2015-07-24 04:47:59 +03:00
string = ''.join('{0:b}'.format(c).rjust(8, '0')[::-1] for c in bits.as_bytes())
2015-07-13 13:58:07 +03:00
for word in message:
code = codes[word]
assert string[:len(code)] == code
string = string[len(code):]
2015-07-19 18:59:51 +03:00
unpacked = [0] * len(message)
bits.seek(0)
codec.decode(bits, unpacked)
2015-07-13 13:58:07 +03:00
assert message == unpacked
def test_rosetta():
txt = u"this is an example for huffman encoding"
symb2freq = defaultdict(int)
for ch in txt:
symb2freq[ch] += 1
2015-07-24 04:47:59 +03:00
by_freq = list(symb2freq.items())
2015-07-13 13:58:07 +03:00
by_freq.sort(reverse=True, key=lambda item: item[1])
symbols = [sym for sym, prob in by_freq]
2015-07-19 18:59:51 +03:00
codec = HuffmanCodec(symb2freq.items())
2015-07-13 13:58:07 +03:00
py_codec = py_encode(symb2freq)
2016-02-10 16:32:54 +03:00
codes = dict([(codec.leaves[i], codec.strings[i]) for i in range(len(codec.leaves))])
2015-07-19 18:59:51 +03:00
2015-07-13 13:58:07 +03:00
my_lengths = defaultdict(int)
py_lengths = defaultdict(int)
2015-07-19 18:59:51 +03:00
for symb, freq in symb2freq.items():
my = codes[symb]
my_lengths[len(my)] += freq
py_lengths[len(py_codec[symb])] += freq
2015-07-13 13:58:07 +03:00
my_exp_len = sum(length * weight for length, weight in my_lengths.items())
py_exp_len = sum(length * weight for length, weight in py_lengths.items())
assert my_exp_len == py_exp_len
2015-07-23 02:19:11 +03:00
@pytest.mark.slow
2015-07-13 13:58:07 +03:00
def test_vocab(EN):
2015-07-19 18:59:51 +03:00
codec = HuffmanCodec([(w.orth, numpy.exp(w.prob)) for w in EN.vocab])
2015-07-13 13:58:07 +03:00
expected_length = 0
for i, code in enumerate(codec.strings):
2015-07-19 18:59:51 +03:00
leaf = codec.leaves[i]
expected_length += len(code) * numpy.exp(EN.vocab[leaf].prob)
2015-07-13 13:58:07 +03:00
assert 8 < expected_length < 15