mirror of
				https://github.com/explosion/spaCy.git
				synced 2025-11-04 18:07:26 +03:00 
			
		
		
		
	* Pass excludes when serializing vocab Additional minor bug fix: * Deserialize vocab in `EntityLinker.from_disk` * Add test for excluding strings on load * Fix formatting
		
			
				
	
	
		
			289 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			289 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import pytest
 | 
						|
from spacy import registry, Vocab, load
 | 
						|
from spacy.pipeline import Tagger, DependencyParser, EntityRecognizer
 | 
						|
from spacy.pipeline import TextCategorizer, SentenceRecognizer, TrainablePipe
 | 
						|
from spacy.pipeline.dep_parser import DEFAULT_PARSER_MODEL
 | 
						|
from spacy.pipeline.tagger import DEFAULT_TAGGER_MODEL
 | 
						|
from spacy.pipeline.textcat import DEFAULT_SINGLE_TEXTCAT_MODEL
 | 
						|
from spacy.pipeline.senter import DEFAULT_SENTER_MODEL
 | 
						|
from spacy.lang.en import English
 | 
						|
from thinc.api import Linear
 | 
						|
import spacy
 | 
						|
 | 
						|
from ..util import make_tempdir
 | 
						|
 | 
						|
 | 
						|
test_parsers = [DependencyParser, EntityRecognizer]
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def parser(en_vocab):
 | 
						|
    config = {
 | 
						|
        "learn_tokens": False,
 | 
						|
        "min_action_freq": 30,
 | 
						|
        "update_with_oracle_cut_size": 100,
 | 
						|
        "beam_width": 1,
 | 
						|
        "beam_update_prob": 1.0,
 | 
						|
        "beam_density": 0.0,
 | 
						|
    }
 | 
						|
    cfg = {"model": DEFAULT_PARSER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    parser = DependencyParser(en_vocab, model, **config)
 | 
						|
    parser.add_label("nsubj")
 | 
						|
    return parser
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def blank_parser(en_vocab):
 | 
						|
    config = {
 | 
						|
        "learn_tokens": False,
 | 
						|
        "min_action_freq": 30,
 | 
						|
        "update_with_oracle_cut_size": 100,
 | 
						|
        "beam_width": 1,
 | 
						|
        "beam_update_prob": 1.0,
 | 
						|
        "beam_density": 0.0,
 | 
						|
    }
 | 
						|
    cfg = {"model": DEFAULT_PARSER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    parser = DependencyParser(en_vocab, model, **config)
 | 
						|
    return parser
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def taggers(en_vocab):
 | 
						|
    cfg = {"model": DEFAULT_TAGGER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    tagger1 = Tagger(en_vocab, model)
 | 
						|
    tagger2 = Tagger(en_vocab, model)
 | 
						|
    return tagger1, tagger2
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize("Parser", test_parsers)
 | 
						|
def test_serialize_parser_roundtrip_bytes(en_vocab, Parser):
 | 
						|
    cfg = {"model": DEFAULT_PARSER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    parser = Parser(en_vocab, model)
 | 
						|
    new_parser = Parser(en_vocab, model)
 | 
						|
    new_parser = new_parser.from_bytes(parser.to_bytes(exclude=["vocab"]))
 | 
						|
    bytes_2 = new_parser.to_bytes(exclude=["vocab"])
 | 
						|
    bytes_3 = parser.to_bytes(exclude=["vocab"])
 | 
						|
    assert len(bytes_2) == len(bytes_3)
 | 
						|
    assert bytes_2 == bytes_3
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize("Parser", test_parsers)
 | 
						|
def test_serialize_parser_strings(Parser):
 | 
						|
    vocab1 = Vocab()
 | 
						|
    label = "FunnyLabel"
 | 
						|
    assert label not in vocab1.strings
 | 
						|
    cfg = {"model": DEFAULT_PARSER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    parser1 = Parser(vocab1, model)
 | 
						|
    parser1.add_label(label)
 | 
						|
    assert label in parser1.vocab.strings
 | 
						|
    vocab2 = Vocab()
 | 
						|
    assert label not in vocab2.strings
 | 
						|
    parser2 = Parser(vocab2, model)
 | 
						|
    parser2 = parser2.from_bytes(parser1.to_bytes(exclude=["vocab"]))
 | 
						|
    assert label in parser2.vocab.strings
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize("Parser", test_parsers)
 | 
						|
def test_serialize_parser_roundtrip_disk(en_vocab, Parser):
 | 
						|
    cfg = {"model": DEFAULT_PARSER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    parser = Parser(en_vocab, model)
 | 
						|
    with make_tempdir() as d:
 | 
						|
        file_path = d / "parser"
 | 
						|
        parser.to_disk(file_path)
 | 
						|
        parser_d = Parser(en_vocab, model)
 | 
						|
        parser_d = parser_d.from_disk(file_path)
 | 
						|
        parser_bytes = parser.to_bytes(exclude=["model", "vocab"])
 | 
						|
        parser_d_bytes = parser_d.to_bytes(exclude=["model", "vocab"])
 | 
						|
        assert len(parser_bytes) == len(parser_d_bytes)
 | 
						|
        assert parser_bytes == parser_d_bytes
 | 
						|
 | 
						|
 | 
						|
def test_to_from_bytes(parser, blank_parser):
 | 
						|
    assert parser.model is not True
 | 
						|
    assert blank_parser.model is not True
 | 
						|
    assert blank_parser.moves.n_moves != parser.moves.n_moves
 | 
						|
    bytes_data = parser.to_bytes(exclude=["vocab"])
 | 
						|
    # the blank parser needs to be resized before we can call from_bytes
 | 
						|
    blank_parser.model.attrs["resize_output"](blank_parser.model, parser.moves.n_moves)
 | 
						|
    blank_parser.from_bytes(bytes_data)
 | 
						|
    assert blank_parser.model is not True
 | 
						|
    assert blank_parser.moves.n_moves == parser.moves.n_moves
 | 
						|
 | 
						|
 | 
						|
def test_serialize_tagger_roundtrip_bytes(en_vocab, taggers):
 | 
						|
    tagger1 = taggers[0]
 | 
						|
    tagger1_b = tagger1.to_bytes()
 | 
						|
    tagger1 = tagger1.from_bytes(tagger1_b)
 | 
						|
    assert tagger1.to_bytes() == tagger1_b
 | 
						|
    cfg = {"model": DEFAULT_TAGGER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    new_tagger1 = Tagger(en_vocab, model).from_bytes(tagger1_b)
 | 
						|
    new_tagger1_b = new_tagger1.to_bytes()
 | 
						|
    assert len(new_tagger1_b) == len(tagger1_b)
 | 
						|
    assert new_tagger1_b == tagger1_b
 | 
						|
 | 
						|
 | 
						|
def test_serialize_tagger_roundtrip_disk(en_vocab, taggers):
 | 
						|
    tagger1, tagger2 = taggers
 | 
						|
    with make_tempdir() as d:
 | 
						|
        file_path1 = d / "tagger1"
 | 
						|
        file_path2 = d / "tagger2"
 | 
						|
        tagger1.to_disk(file_path1)
 | 
						|
        tagger2.to_disk(file_path2)
 | 
						|
        cfg = {"model": DEFAULT_TAGGER_MODEL}
 | 
						|
        model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
        tagger1_d = Tagger(en_vocab, model).from_disk(file_path1)
 | 
						|
        tagger2_d = Tagger(en_vocab, model).from_disk(file_path2)
 | 
						|
        assert tagger1_d.to_bytes() == tagger2_d.to_bytes()
 | 
						|
 | 
						|
 | 
						|
def test_serialize_tagger_strings(en_vocab, de_vocab, taggers):
 | 
						|
    label = "SomeWeirdLabel"
 | 
						|
    assert label not in en_vocab.strings
 | 
						|
    assert label not in de_vocab.strings
 | 
						|
    tagger = taggers[0]
 | 
						|
    assert label not in tagger.vocab.strings
 | 
						|
    with make_tempdir() as d:
 | 
						|
        # check that custom labels are serialized as part of the component's strings.jsonl
 | 
						|
        tagger.add_label(label)
 | 
						|
        assert label in tagger.vocab.strings
 | 
						|
        file_path = d / "tagger1"
 | 
						|
        tagger.to_disk(file_path)
 | 
						|
        # ensure that the custom strings are loaded back in when using the tagger in another pipeline
 | 
						|
        cfg = {"model": DEFAULT_TAGGER_MODEL}
 | 
						|
        model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
        tagger2 = Tagger(de_vocab, model).from_disk(file_path)
 | 
						|
        assert label in tagger2.vocab.strings
 | 
						|
 | 
						|
 | 
						|
def test_serialize_textcat_empty(en_vocab):
 | 
						|
    # See issue #1105
 | 
						|
    cfg = {"model": DEFAULT_SINGLE_TEXTCAT_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    textcat = TextCategorizer(en_vocab, model, threshold=0.5)
 | 
						|
    textcat.to_bytes(exclude=["vocab"])
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize("Parser", test_parsers)
 | 
						|
def test_serialize_pipe_exclude(en_vocab, Parser):
 | 
						|
    cfg = {"model": DEFAULT_PARSER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
 | 
						|
    def get_new_parser():
 | 
						|
        new_parser = Parser(en_vocab, model)
 | 
						|
        return new_parser
 | 
						|
 | 
						|
    parser = Parser(en_vocab, model)
 | 
						|
    parser.cfg["foo"] = "bar"
 | 
						|
    new_parser = get_new_parser().from_bytes(parser.to_bytes(exclude=["vocab"]))
 | 
						|
    assert "foo" in new_parser.cfg
 | 
						|
    new_parser = get_new_parser().from_bytes(
 | 
						|
        parser.to_bytes(exclude=["vocab"]), exclude=["cfg"]
 | 
						|
    )
 | 
						|
    assert "foo" not in new_parser.cfg
 | 
						|
    new_parser = get_new_parser().from_bytes(
 | 
						|
        parser.to_bytes(exclude=["cfg"]), exclude=["vocab"]
 | 
						|
    )
 | 
						|
    assert "foo" not in new_parser.cfg
 | 
						|
 | 
						|
 | 
						|
def test_serialize_sentencerecognizer(en_vocab):
 | 
						|
    cfg = {"model": DEFAULT_SENTER_MODEL}
 | 
						|
    model = registry.resolve(cfg, validate=True)["model"]
 | 
						|
    sr = SentenceRecognizer(en_vocab, model)
 | 
						|
    sr_b = sr.to_bytes()
 | 
						|
    sr_d = SentenceRecognizer(en_vocab, model).from_bytes(sr_b)
 | 
						|
    assert sr.to_bytes() == sr_d.to_bytes()
 | 
						|
 | 
						|
 | 
						|
def test_serialize_pipeline_disable_enable():
 | 
						|
    nlp = English()
 | 
						|
    nlp.add_pipe("ner")
 | 
						|
    nlp.add_pipe("tagger")
 | 
						|
    nlp.disable_pipe("tagger")
 | 
						|
    assert nlp.config["nlp"]["disabled"] == ["tagger"]
 | 
						|
    config = nlp.config.copy()
 | 
						|
    nlp2 = English.from_config(config)
 | 
						|
    assert nlp2.pipe_names == ["ner"]
 | 
						|
    assert nlp2.component_names == ["ner", "tagger"]
 | 
						|
    assert nlp2.disabled == ["tagger"]
 | 
						|
    assert nlp2.config["nlp"]["disabled"] == ["tagger"]
 | 
						|
    with make_tempdir() as d:
 | 
						|
        nlp2.to_disk(d)
 | 
						|
        nlp3 = spacy.load(d)
 | 
						|
    assert nlp3.pipe_names == ["ner"]
 | 
						|
    assert nlp3.component_names == ["ner", "tagger"]
 | 
						|
    with make_tempdir() as d:
 | 
						|
        nlp3.to_disk(d)
 | 
						|
        nlp4 = spacy.load(d, disable=["ner"])
 | 
						|
    assert nlp4.pipe_names == []
 | 
						|
    assert nlp4.component_names == ["ner", "tagger"]
 | 
						|
    assert nlp4.disabled == ["ner", "tagger"]
 | 
						|
    with make_tempdir() as d:
 | 
						|
        nlp.to_disk(d)
 | 
						|
        nlp5 = spacy.load(d, exclude=["tagger"])
 | 
						|
    assert nlp5.pipe_names == ["ner"]
 | 
						|
    assert nlp5.component_names == ["ner"]
 | 
						|
    assert nlp5.disabled == []
 | 
						|
 | 
						|
 | 
						|
def test_serialize_custom_trainable_pipe():
 | 
						|
    class BadCustomPipe1(TrainablePipe):
 | 
						|
        def __init__(self, vocab):
 | 
						|
            pass
 | 
						|
 | 
						|
    class BadCustomPipe2(TrainablePipe):
 | 
						|
        def __init__(self, vocab):
 | 
						|
            self.vocab = vocab
 | 
						|
            self.model = None
 | 
						|
 | 
						|
    class CustomPipe(TrainablePipe):
 | 
						|
        def __init__(self, vocab, model):
 | 
						|
            self.vocab = vocab
 | 
						|
            self.model = model
 | 
						|
 | 
						|
    pipe = BadCustomPipe1(Vocab())
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        pipe.to_bytes()
 | 
						|
    with make_tempdir() as d:
 | 
						|
        with pytest.raises(ValueError):
 | 
						|
            pipe.to_disk(d)
 | 
						|
    pipe = BadCustomPipe2(Vocab())
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        pipe.to_bytes()
 | 
						|
    with make_tempdir() as d:
 | 
						|
        with pytest.raises(ValueError):
 | 
						|
            pipe.to_disk(d)
 | 
						|
    pipe = CustomPipe(Vocab(), Linear())
 | 
						|
    pipe_bytes = pipe.to_bytes()
 | 
						|
    new_pipe = CustomPipe(Vocab(), Linear()).from_bytes(pipe_bytes)
 | 
						|
    assert new_pipe.to_bytes() == pipe_bytes
 | 
						|
    with make_tempdir() as d:
 | 
						|
        pipe.to_disk(d)
 | 
						|
        new_pipe = CustomPipe(Vocab(), Linear()).from_disk(d)
 | 
						|
    assert new_pipe.to_bytes() == pipe_bytes
 | 
						|
 | 
						|
 | 
						|
def test_load_without_strings():
 | 
						|
    nlp = spacy.blank("en")
 | 
						|
    orig_strings_length = len(nlp.vocab.strings)
 | 
						|
    word = "unlikely_word_" * 20
 | 
						|
    nlp.vocab.strings.add(word)
 | 
						|
    assert len(nlp.vocab.strings) == orig_strings_length + 1
 | 
						|
    with make_tempdir() as d:
 | 
						|
        nlp.to_disk(d)
 | 
						|
        # reload with strings
 | 
						|
        reloaded_nlp = load(d)
 | 
						|
        assert len(nlp.vocab.strings) == len(reloaded_nlp.vocab.strings)
 | 
						|
        assert word in reloaded_nlp.vocab.strings
 | 
						|
        # reload without strings
 | 
						|
        reloaded_nlp = load(d, exclude=["strings"])
 | 
						|
        assert orig_strings_length == len(reloaded_nlp.vocab.strings)
 | 
						|
        assert word not in reloaded_nlp.vocab.strings
 |