mirror of
https://github.com/explosion/spaCy.git
synced 2025-01-01 04:46:38 +03:00
e27c60a702
* Improve the correctness of _parse_patch * If there are no more actions, do not attempt to make further transitions, even if not all states are final. * Assert that the number of actions for a step is the same as the number of states. * Reimplement distillation with oracle cut size The code for distillation with an oracle cut size was not reimplemented after the parser refactor. We did not notice, because we did not have tests for this functionality. This change brings back the functionality and adds this to the parser tests. * Rename states2actions to _states_to_actions for consistency * Test distillation max cuts in NER * Mark parser/NER tests as slow * Typo * Fix invariant in _states_diff_to_actions * Rename _init_batch -> _init_batch_from_teacher * Ninja edit the ninja edit * Check that we raise an exception when we pass the incorrect number or actions * Remove unnecessary get Co-authored-by: Madeesh Kannan <shadeMe@users.noreply.github.com> * Write out condition more explicitly --------- Co-authored-by: Madeesh Kannan <shadeMe@users.noreply.github.com>
878 lines
30 KiB
Python
878 lines
30 KiB
Python
import random
|
|
|
|
import pytest
|
|
from numpy.testing import assert_equal
|
|
|
|
from spacy.attrs import ENT_IOB
|
|
from spacy import util, registry
|
|
from spacy.lang.en import English
|
|
from spacy.lang.it import Italian
|
|
from spacy.language import Language
|
|
from spacy.lookups import Lookups
|
|
from spacy.pipeline._parser_internals.ner import BiluoPushDown
|
|
from spacy.training import Example, iob_to_biluo, split_bilu_label
|
|
from spacy.tokens import Doc, Span
|
|
from spacy.vocab import Vocab
|
|
from thinc.api import fix_random_seed
|
|
import logging
|
|
|
|
from ..util import make_tempdir
|
|
from ...pipeline import EntityRecognizer
|
|
from ...pipeline.ner import DEFAULT_NER_MODEL
|
|
|
|
TRAIN_DATA = [
|
|
("Who is Shaka Khan?", {"entities": [(7, 17, "PERSON")]}),
|
|
("I like London and Berlin.", {"entities": [(7, 13, "LOC"), (18, 24, "LOC")]}),
|
|
]
|
|
|
|
|
|
@pytest.fixture
|
|
def neg_key():
|
|
return "non_entities"
|
|
|
|
|
|
@pytest.fixture
|
|
def vocab():
|
|
return Vocab()
|
|
|
|
|
|
@pytest.fixture
|
|
def doc(vocab):
|
|
return Doc(vocab, words=["Casey", "went", "to", "New", "York", "."])
|
|
|
|
|
|
@pytest.fixture
|
|
def entity_annots(doc):
|
|
casey = doc[0:1]
|
|
ny = doc[3:5]
|
|
return [
|
|
(casey.start_char, casey.end_char, "PERSON"),
|
|
(ny.start_char, ny.end_char, "GPE"),
|
|
]
|
|
|
|
|
|
@pytest.fixture
|
|
def entity_types(entity_annots):
|
|
return sorted(set([label for (s, e, label) in entity_annots]))
|
|
|
|
|
|
@pytest.fixture
|
|
def tsys(vocab, entity_types):
|
|
actions = BiluoPushDown.get_actions(entity_types=entity_types)
|
|
return BiluoPushDown(vocab.strings, actions)
|
|
|
|
|
|
@pytest.mark.parametrize("label", ["U-JOB-NAME"])
|
|
@pytest.mark.issue(1967)
|
|
def test_issue1967(label):
|
|
nlp = Language()
|
|
config = {}
|
|
ner = nlp.create_pipe("ner", config=config)
|
|
example = Example.from_dict(
|
|
Doc(ner.vocab, words=["word"]),
|
|
{
|
|
"ids": [0],
|
|
"words": ["word"],
|
|
"tags": ["tag"],
|
|
"heads": [0],
|
|
"deps": ["dep"],
|
|
"entities": [label],
|
|
},
|
|
)
|
|
assert "JOB-NAME" in ner.moves.get_actions(examples=[example])[1]
|
|
|
|
|
|
@pytest.mark.issue(2179)
|
|
def test_issue2179():
|
|
"""Test that spurious 'extra_labels' aren't created when initializing NER."""
|
|
nlp = Italian()
|
|
ner = nlp.add_pipe("ner")
|
|
ner.add_label("CITIZENSHIP")
|
|
nlp.initialize()
|
|
nlp2 = Italian()
|
|
nlp2.add_pipe("ner")
|
|
assert len(nlp2.get_pipe("ner").labels) == 0
|
|
model = nlp2.get_pipe("ner").model
|
|
model.attrs["resize_output"](model, nlp.get_pipe("ner").moves.n_moves)
|
|
nlp2.from_bytes(nlp.to_bytes())
|
|
assert "extra_labels" not in nlp2.get_pipe("ner").cfg
|
|
assert nlp2.get_pipe("ner").labels == ("CITIZENSHIP",)
|
|
|
|
|
|
@pytest.mark.issue(2385)
|
|
def test_issue2385():
|
|
"""Test that IOB tags are correctly converted to BILUO tags."""
|
|
# fix bug in labels with a 'b' character
|
|
tags1 = ("B-BRAWLER", "I-BRAWLER", "I-BRAWLER")
|
|
assert iob_to_biluo(tags1) == ["B-BRAWLER", "I-BRAWLER", "L-BRAWLER"]
|
|
# maintain support for iob1 format
|
|
tags2 = ("I-ORG", "I-ORG", "B-ORG")
|
|
assert iob_to_biluo(tags2) == ["B-ORG", "L-ORG", "U-ORG"]
|
|
# maintain support for iob2 format
|
|
tags3 = ("B-PERSON", "I-PERSON", "B-PERSON")
|
|
assert iob_to_biluo(tags3) == ["B-PERSON", "L-PERSON", "U-PERSON"]
|
|
# ensure it works with hyphens in the name
|
|
tags4 = ("B-MULTI-PERSON", "I-MULTI-PERSON", "B-MULTI-PERSON")
|
|
assert iob_to_biluo(tags4) == ["B-MULTI-PERSON", "L-MULTI-PERSON", "U-MULTI-PERSON"]
|
|
|
|
|
|
@pytest.mark.issue(2800)
|
|
def test_issue2800():
|
|
"""Test issue that arises when too many labels are added to NER model.
|
|
Used to cause segfault.
|
|
"""
|
|
nlp = English()
|
|
train_data = []
|
|
train_data.extend(
|
|
[Example.from_dict(nlp.make_doc("One sentence"), {"entities": []})]
|
|
)
|
|
entity_types = [str(i) for i in range(1000)]
|
|
ner = nlp.add_pipe("ner")
|
|
for entity_type in list(entity_types):
|
|
ner.add_label(entity_type)
|
|
optimizer = nlp.initialize()
|
|
for i in range(20):
|
|
losses = {}
|
|
random.shuffle(train_data)
|
|
for example in train_data:
|
|
nlp.update([example], sgd=optimizer, losses=losses, drop=0.5)
|
|
|
|
|
|
@pytest.mark.issue(3209)
|
|
def test_issue3209():
|
|
"""Test issue that occurred in spaCy nightly where NER labels were being
|
|
mapped to classes incorrectly after loading the model, when the labels
|
|
were added using ner.add_label().
|
|
"""
|
|
nlp = English()
|
|
ner = nlp.add_pipe("ner")
|
|
ner.add_label("ANIMAL")
|
|
nlp.initialize()
|
|
move_names = ["O", "B-ANIMAL", "I-ANIMAL", "L-ANIMAL", "U-ANIMAL"]
|
|
assert ner.move_names == move_names
|
|
nlp2 = English()
|
|
ner2 = nlp2.add_pipe("ner")
|
|
model = ner2.model
|
|
model.attrs["resize_output"](model, ner.moves.n_moves)
|
|
nlp2.from_bytes(nlp.to_bytes())
|
|
assert ner2.move_names == move_names
|
|
|
|
|
|
def test_labels_from_BILUO():
|
|
"""Test that labels are inferred correctly when there's a - in label."""
|
|
nlp = English()
|
|
ner = nlp.add_pipe("ner")
|
|
ner.add_label("LARGE-ANIMAL")
|
|
nlp.initialize()
|
|
move_names = [
|
|
"O",
|
|
"B-LARGE-ANIMAL",
|
|
"I-LARGE-ANIMAL",
|
|
"L-LARGE-ANIMAL",
|
|
"U-LARGE-ANIMAL",
|
|
]
|
|
labels = {"LARGE-ANIMAL"}
|
|
assert ner.move_names == move_names
|
|
assert set(ner.labels) == labels
|
|
|
|
|
|
@pytest.mark.issue(4267)
|
|
def test_issue4267():
|
|
"""Test that running an entity_ruler after ner gives consistent results"""
|
|
nlp = English()
|
|
ner = nlp.add_pipe("ner")
|
|
ner.add_label("PEOPLE")
|
|
nlp.initialize()
|
|
assert "ner" in nlp.pipe_names
|
|
# assert that we have correct IOB annotations
|
|
doc1 = nlp("hi")
|
|
assert doc1.has_annotation("ENT_IOB")
|
|
for token in doc1:
|
|
assert token.ent_iob == 2
|
|
# add entity ruler and run again
|
|
patterns = [{"label": "SOFTWARE", "pattern": "spacy"}]
|
|
ruler = nlp.add_pipe("entity_ruler")
|
|
ruler.add_patterns(patterns)
|
|
assert "entity_ruler" in nlp.pipe_names
|
|
assert "ner" in nlp.pipe_names
|
|
# assert that we still have correct IOB annotations
|
|
doc2 = nlp("hi")
|
|
assert doc2.has_annotation("ENT_IOB")
|
|
for token in doc2:
|
|
assert token.ent_iob == 2
|
|
|
|
|
|
@pytest.mark.issue(4313)
|
|
def test_issue4313():
|
|
"""This should not crash or exit with some strange error code"""
|
|
beam_width = 16
|
|
beam_density = 0.0001
|
|
nlp = English()
|
|
config = {
|
|
"beam_width": beam_width,
|
|
"beam_density": beam_density,
|
|
}
|
|
ner = nlp.add_pipe("beam_ner", config=config)
|
|
ner.add_label("SOME_LABEL")
|
|
nlp.initialize()
|
|
# add a new label to the doc
|
|
doc = nlp("What do you think about Apple ?")
|
|
assert len(ner.labels) == 1
|
|
assert "SOME_LABEL" in ner.labels
|
|
apple_ent = Span(doc, 5, 6, label="MY_ORG")
|
|
doc.ents = list(doc.ents) + [apple_ent]
|
|
|
|
# ensure the beam_parse still works with the new label
|
|
docs = [doc]
|
|
ner.beam_parse(docs, drop=0.0, beam_width=beam_width, beam_density=beam_density)
|
|
assert len(ner.labels) == 2
|
|
assert "MY_ORG" in ner.labels
|
|
|
|
|
|
def test_get_oracle_moves(tsys, doc, entity_annots):
|
|
example = Example.from_dict(doc, {"entities": entity_annots})
|
|
act_classes = tsys.get_oracle_sequence(example, _debug=False)
|
|
names = [tsys.get_class_name(act) for act in act_classes]
|
|
assert names == ["U-PERSON", "O", "O", "B-GPE", "L-GPE", "O"]
|
|
|
|
|
|
def test_negative_samples_two_word_input(tsys, vocab, neg_key):
|
|
"""Test that we don't get stuck in a two word input when we have a negative
|
|
span. This could happen if we don't have the right check on the B action.
|
|
"""
|
|
tsys.cfg["neg_key"] = neg_key
|
|
doc = Doc(vocab, words=["A", "B"])
|
|
entity_annots = [None, None]
|
|
example = Example.from_dict(doc, {"entities": entity_annots})
|
|
# These mean that the oracle sequence shouldn't have O for the first
|
|
# word, and it shouldn't analyse it as B-PERSON, L-PERSON
|
|
example.y.spans[neg_key] = [
|
|
Span(example.y, 0, 1, label="O"),
|
|
Span(example.y, 0, 2, label="PERSON"),
|
|
]
|
|
act_classes = tsys.get_oracle_sequence(example)
|
|
names = [tsys.get_class_name(act) for act in act_classes]
|
|
assert names
|
|
assert names[0] != "O"
|
|
assert names[0] != "B-PERSON"
|
|
assert names[1] != "L-PERSON"
|
|
|
|
|
|
def test_negative_samples_three_word_input(tsys, vocab, neg_key):
|
|
"""Test that we exclude a 2-word entity correctly using a negative example."""
|
|
tsys.cfg["neg_key"] = neg_key
|
|
doc = Doc(vocab, words=["A", "B", "C"])
|
|
entity_annots = [None, None, None]
|
|
example = Example.from_dict(doc, {"entities": entity_annots})
|
|
# These mean that the oracle sequence shouldn't have O for the first
|
|
# word, and it shouldn't analyse it as B-PERSON, L-PERSON
|
|
example.y.spans[neg_key] = [
|
|
Span(example.y, 0, 1, label="O"),
|
|
Span(example.y, 0, 2, label="PERSON"),
|
|
]
|
|
act_classes = tsys.get_oracle_sequence(example)
|
|
names = [tsys.get_class_name(act) for act in act_classes]
|
|
assert names
|
|
assert names[0] != "O"
|
|
assert names[1] != "B-PERSON"
|
|
|
|
|
|
def test_negative_samples_U_entity(tsys, vocab, neg_key):
|
|
"""Test that we exclude a 2-word entity correctly using a negative example."""
|
|
tsys.cfg["neg_key"] = neg_key
|
|
doc = Doc(vocab, words=["A"])
|
|
entity_annots = [None]
|
|
example = Example.from_dict(doc, {"entities": entity_annots})
|
|
# These mean that the oracle sequence shouldn't have O for the first
|
|
# word, and it shouldn't analyse it as B-PERSON, L-PERSON
|
|
example.y.spans[neg_key] = [
|
|
Span(example.y, 0, 1, label="O"),
|
|
Span(example.y, 0, 1, label="PERSON"),
|
|
]
|
|
act_classes = tsys.get_oracle_sequence(example)
|
|
names = [tsys.get_class_name(act) for act in act_classes]
|
|
assert names
|
|
assert names[0] != "O"
|
|
assert names[0] != "U-PERSON"
|
|
|
|
|
|
def test_negative_sample_key_is_in_config(vocab, entity_types):
|
|
actions = BiluoPushDown.get_actions(entity_types=entity_types)
|
|
tsys = BiluoPushDown(vocab.strings, actions, incorrect_spans_key="non_entities")
|
|
assert tsys.cfg["neg_key"] == "non_entities"
|
|
|
|
|
|
# We can't easily represent this on a Doc object. Not sure what the best solution
|
|
# would be, but I don't think it's an important use case?
|
|
@pytest.mark.skip(reason="No longer supported")
|
|
def test_oracle_moves_missing_B(en_vocab):
|
|
words = ["B", "52", "Bomber"]
|
|
biluo_tags = [None, None, "L-PRODUCT"]
|
|
|
|
doc = Doc(en_vocab, words=words)
|
|
example = Example.from_dict(doc, {"words": words, "entities": biluo_tags})
|
|
|
|
moves = BiluoPushDown(en_vocab.strings)
|
|
move_types = ("M", "B", "I", "L", "U", "O")
|
|
for tag in biluo_tags:
|
|
if tag is None:
|
|
continue
|
|
elif tag == "O":
|
|
moves.add_action(move_types.index("O"), "")
|
|
else:
|
|
action, label = split_bilu_label(tag)
|
|
moves.add_action(move_types.index("B"), label)
|
|
moves.add_action(move_types.index("I"), label)
|
|
moves.add_action(move_types.index("L"), label)
|
|
moves.add_action(move_types.index("U"), label)
|
|
moves.get_oracle_sequence(example)
|
|
|
|
|
|
# We can't easily represent this on a Doc object. Not sure what the best solution
|
|
# would be, but I don't think it's an important use case?
|
|
@pytest.mark.skip(reason="No longer supported")
|
|
def test_oracle_moves_whitespace(en_vocab):
|
|
words = ["production", "\n", "of", "Northrop", "\n", "Corp.", "\n", "'s", "radar"]
|
|
biluo_tags = ["O", "O", "O", "B-ORG", None, "I-ORG", "L-ORG", "O", "O"]
|
|
|
|
doc = Doc(en_vocab, words=words)
|
|
example = Example.from_dict(doc, {"entities": biluo_tags})
|
|
|
|
moves = BiluoPushDown(en_vocab.strings)
|
|
move_types = ("M", "B", "I", "L", "U", "O")
|
|
for tag in biluo_tags:
|
|
if tag is None:
|
|
continue
|
|
elif tag == "O":
|
|
moves.add_action(move_types.index("O"), "")
|
|
else:
|
|
action, label = split_bilu_label(tag)
|
|
moves.add_action(move_types.index(action), label)
|
|
moves.get_oracle_sequence(example)
|
|
|
|
|
|
def test_accept_blocked_token():
|
|
"""Test succesful blocking of tokens to be in an entity."""
|
|
# 1. test normal behaviour
|
|
nlp1 = English()
|
|
doc1 = nlp1("I live in New York")
|
|
config = {}
|
|
ner1 = nlp1.create_pipe("ner", config=config)
|
|
assert [token.ent_iob_ for token in doc1] == ["", "", "", "", ""]
|
|
assert [token.ent_type_ for token in doc1] == ["", "", "", "", ""]
|
|
|
|
# Add the OUT action
|
|
ner1.moves.add_action(5, "")
|
|
ner1.add_label("GPE")
|
|
# Get into the state just before "New"
|
|
state1 = ner1.moves.init_batch([doc1])[0]
|
|
ner1.moves.apply_transition(state1, "O")
|
|
ner1.moves.apply_transition(state1, "O")
|
|
ner1.moves.apply_transition(state1, "O")
|
|
# Check that B-GPE is valid.
|
|
assert ner1.moves.is_valid(state1, "B-GPE")
|
|
|
|
# 2. test blocking behaviour
|
|
nlp2 = English()
|
|
doc2 = nlp2("I live in New York")
|
|
config = {}
|
|
ner2 = nlp2.create_pipe("ner", config=config)
|
|
|
|
# set "New York" to a blocked entity
|
|
doc2.set_ents([], blocked=[doc2[3:5]], default="unmodified")
|
|
assert [token.ent_iob_ for token in doc2] == ["", "", "", "B", "B"]
|
|
assert [token.ent_type_ for token in doc2] == ["", "", "", "", ""]
|
|
|
|
# Check that B-GPE is now invalid.
|
|
ner2.moves.add_action(4, "")
|
|
ner2.moves.add_action(5, "")
|
|
ner2.add_label("GPE")
|
|
state2 = ner2.moves.init_batch([doc2])[0]
|
|
ner2.moves.apply_transition(state2, "O")
|
|
ner2.moves.apply_transition(state2, "O")
|
|
ner2.moves.apply_transition(state2, "O")
|
|
# we can only use U- for "New"
|
|
assert not ner2.moves.is_valid(state2, "B-GPE")
|
|
assert ner2.moves.is_valid(state2, "U-")
|
|
ner2.moves.apply_transition(state2, "U-")
|
|
# we can only use U- for "York"
|
|
assert not ner2.moves.is_valid(state2, "B-GPE")
|
|
assert ner2.moves.is_valid(state2, "U-")
|
|
|
|
|
|
def test_train_empty():
|
|
"""Test that training an empty text does not throw errors."""
|
|
train_data = [
|
|
("Who is Shaka Khan?", {"entities": [(7, 17, "PERSON")]}),
|
|
("", {"entities": []}),
|
|
]
|
|
|
|
nlp = English()
|
|
train_examples = []
|
|
for t in train_data:
|
|
train_examples.append(Example.from_dict(nlp.make_doc(t[0]), t[1]))
|
|
ner = nlp.add_pipe("ner", last=True)
|
|
ner.add_label("PERSON")
|
|
nlp.initialize(get_examples=lambda: train_examples)
|
|
for itn in range(2):
|
|
losses = {}
|
|
batches = util.minibatch(train_examples, size=8)
|
|
for batch in batches:
|
|
nlp.update(batch, losses=losses)
|
|
|
|
|
|
def test_train_negative_deprecated():
|
|
"""Test that the deprecated negative entity format raises a custom error."""
|
|
train_data = [
|
|
("Who is Shaka Khan?", {"entities": [(7, 17, "!PERSON")]}),
|
|
]
|
|
|
|
nlp = English()
|
|
train_examples = []
|
|
for t in train_data:
|
|
train_examples.append(Example.from_dict(nlp.make_doc(t[0]), t[1]))
|
|
ner = nlp.add_pipe("ner", last=True)
|
|
ner.add_label("PERSON")
|
|
nlp.initialize()
|
|
for itn in range(2):
|
|
losses = {}
|
|
batches = util.minibatch(train_examples, size=8)
|
|
for batch in batches:
|
|
with pytest.raises(ValueError):
|
|
nlp.update(batch, losses=losses)
|
|
|
|
|
|
def test_overwrite_token():
|
|
nlp = English()
|
|
nlp.add_pipe("ner")
|
|
nlp.initialize()
|
|
# The untrained NER will predict O for each token
|
|
doc = nlp("I live in New York")
|
|
assert [token.ent_iob_ for token in doc] == ["O", "O", "O", "O", "O"]
|
|
assert [token.ent_type_ for token in doc] == ["", "", "", "", ""]
|
|
# Check that a new ner can overwrite O
|
|
config = {}
|
|
ner2 = nlp.create_pipe("ner", config=config)
|
|
ner2.moves.add_action(5, "")
|
|
ner2.add_label("GPE")
|
|
state = ner2.moves.init_batch([doc])[0]
|
|
assert ner2.moves.is_valid(state, "B-GPE")
|
|
assert ner2.moves.is_valid(state, "U-GPE")
|
|
ner2.moves.apply_transition(state, "B-GPE")
|
|
assert ner2.moves.is_valid(state, "I-GPE")
|
|
assert ner2.moves.is_valid(state, "L-GPE")
|
|
|
|
|
|
def test_empty_ner():
|
|
nlp = English()
|
|
ner = nlp.add_pipe("ner")
|
|
ner.add_label("MY_LABEL")
|
|
nlp.initialize()
|
|
doc = nlp("John is watching the news about Croatia's elections")
|
|
# if this goes wrong, the initialization of the parser's upper layer is probably broken
|
|
result = ["O", "O", "O", "O", "O", "O", "O", "O", "O"]
|
|
assert [token.ent_iob_ for token in doc] == result
|
|
|
|
|
|
def test_ruler_before_ner():
|
|
"""Test that an NER works after an entity_ruler: the second can add annotations"""
|
|
nlp = English()
|
|
|
|
# 1 : Entity Ruler - should set "this" to B and everything else to empty
|
|
patterns = [{"label": "THING", "pattern": "This"}]
|
|
ruler = nlp.add_pipe("entity_ruler")
|
|
|
|
# 2: untrained NER - should set everything else to O
|
|
untrained_ner = nlp.add_pipe("ner")
|
|
untrained_ner.add_label("MY_LABEL")
|
|
nlp.initialize()
|
|
ruler.add_patterns(patterns)
|
|
doc = nlp("This is Antti Korhonen speaking in Finland")
|
|
expected_iobs = ["B", "O", "O", "O", "O", "O", "O"]
|
|
expected_types = ["THING", "", "", "", "", "", ""]
|
|
assert [token.ent_iob_ for token in doc] == expected_iobs
|
|
assert [token.ent_type_ for token in doc] == expected_types
|
|
|
|
|
|
def test_ner_constructor(en_vocab):
|
|
config = {
|
|
"update_with_oracle_cut_size": 100,
|
|
}
|
|
cfg = {"model": DEFAULT_NER_MODEL}
|
|
model = registry.resolve(cfg, validate=True)["model"]
|
|
EntityRecognizer(en_vocab, model, **config)
|
|
EntityRecognizer(en_vocab, model)
|
|
|
|
|
|
def test_ner_before_ruler():
|
|
"""Test that an entity_ruler works after an NER: the second can overwrite O annotations"""
|
|
nlp = English()
|
|
|
|
# 1: untrained NER - should set everything to O
|
|
untrained_ner = nlp.add_pipe("ner", name="uner")
|
|
untrained_ner.add_label("MY_LABEL")
|
|
nlp.initialize()
|
|
|
|
# 2 : Entity Ruler - should set "this" to B and keep everything else O
|
|
patterns = [{"label": "THING", "pattern": "This"}]
|
|
ruler = nlp.add_pipe("entity_ruler")
|
|
ruler.add_patterns(patterns)
|
|
|
|
doc = nlp("This is Antti Korhonen speaking in Finland")
|
|
expected_iobs = ["B", "O", "O", "O", "O", "O", "O"]
|
|
expected_types = ["THING", "", "", "", "", "", ""]
|
|
assert [token.ent_iob_ for token in doc] == expected_iobs
|
|
assert [token.ent_type_ for token in doc] == expected_types
|
|
|
|
|
|
def test_block_ner():
|
|
"""Test functionality for blocking tokens so they can't be in a named entity"""
|
|
# block "Antti L Korhonen" from being a named entity
|
|
nlp = English()
|
|
nlp.add_pipe("blocker", config={"start": 2, "end": 5})
|
|
untrained_ner = nlp.add_pipe("ner")
|
|
untrained_ner.add_label("MY_LABEL")
|
|
nlp.initialize()
|
|
doc = nlp("This is Antti L Korhonen speaking in Finland")
|
|
expected_iobs = ["O", "O", "B", "B", "B", "O", "O", "O"]
|
|
expected_types = ["", "", "", "", "", "", "", ""]
|
|
assert [token.ent_iob_ for token in doc] == expected_iobs
|
|
assert [token.ent_type_ for token in doc] == expected_types
|
|
|
|
|
|
def test_overfitting_IO():
|
|
fix_random_seed(1)
|
|
# Simple test to try and quickly overfit the NER component
|
|
nlp = English()
|
|
ner = nlp.add_pipe("ner", config={"model": {}})
|
|
train_examples = []
|
|
for text, annotations in TRAIN_DATA:
|
|
train_examples.append(Example.from_dict(nlp.make_doc(text), annotations))
|
|
for ent in annotations.get("entities"):
|
|
ner.add_label(ent[2])
|
|
optimizer = nlp.initialize()
|
|
|
|
for i in range(50):
|
|
losses = {}
|
|
nlp.update(train_examples, sgd=optimizer, losses=losses)
|
|
assert losses["ner"] < 0.00001
|
|
|
|
# test the trained model
|
|
test_text = "I like London."
|
|
doc = nlp(test_text)
|
|
ents = doc.ents
|
|
assert len(ents) == 1
|
|
assert ents[0].text == "London"
|
|
assert ents[0].label_ == "LOC"
|
|
|
|
# Also test the results are still the same after IO
|
|
with make_tempdir() as tmp_dir:
|
|
nlp.to_disk(tmp_dir)
|
|
nlp2 = util.load_model_from_path(tmp_dir)
|
|
doc2 = nlp2(test_text)
|
|
ents2 = doc2.ents
|
|
assert len(ents2) == 1
|
|
assert ents2[0].text == "London"
|
|
assert ents2[0].label_ == "LOC"
|
|
# Ensure that the predictions are still the same, even after adding a new label
|
|
ner2 = nlp2.get_pipe("ner")
|
|
ner2.add_label("RANDOM_NEW_LABEL")
|
|
doc3 = nlp2(test_text)
|
|
ents3 = doc3.ents
|
|
assert len(ents3) == 1
|
|
assert ents3[0].text == "London"
|
|
assert ents3[0].label_ == "LOC"
|
|
|
|
# Make sure that running pipe twice, or comparing to call, always amounts to the same predictions
|
|
texts = [
|
|
"Just a sentence.",
|
|
"Then one more sentence about London.",
|
|
"Here is another one.",
|
|
"I like London.",
|
|
]
|
|
batch_deps_1 = [doc.to_array([ENT_IOB]) for doc in nlp.pipe(texts)]
|
|
batch_deps_2 = [doc.to_array([ENT_IOB]) for doc in nlp.pipe(texts)]
|
|
no_batch_deps = [doc.to_array([ENT_IOB]) for doc in [nlp(text) for text in texts]]
|
|
assert_equal(batch_deps_1, batch_deps_2)
|
|
assert_equal(batch_deps_1, no_batch_deps)
|
|
|
|
# test that kb_id is preserved
|
|
test_text = "I like London and London."
|
|
doc = nlp.make_doc(test_text)
|
|
doc.ents = [Span(doc, 2, 3, label="LOC", kb_id=1234)]
|
|
ents = doc.ents
|
|
assert len(ents) == 1
|
|
assert ents[0].text == "London"
|
|
assert ents[0].label_ == "LOC"
|
|
assert ents[0].kb_id == 1234
|
|
doc = nlp.get_pipe("ner")(doc)
|
|
ents = doc.ents
|
|
assert len(ents) == 2
|
|
assert ents[0].text == "London"
|
|
assert ents[0].label_ == "LOC"
|
|
assert ents[0].kb_id == 1234
|
|
# ent added by ner has kb_id == 0
|
|
assert ents[1].text == "London"
|
|
assert ents[1].label_ == "LOC"
|
|
assert ents[1].kb_id == 0
|
|
|
|
|
|
def test_is_distillable():
|
|
nlp = English()
|
|
ner = nlp.add_pipe("ner")
|
|
assert ner.is_distillable
|
|
|
|
|
|
@pytest.mark.slow
|
|
@pytest.mark.parametrize("max_moves", [0, 1, 5, 100])
|
|
def test_distill(max_moves):
|
|
teacher = English()
|
|
teacher_ner = teacher.add_pipe("ner")
|
|
train_examples = []
|
|
for text, annotations in TRAIN_DATA:
|
|
train_examples.append(Example.from_dict(teacher.make_doc(text), annotations))
|
|
for ent in annotations.get("entities"):
|
|
teacher_ner.add_label(ent[2])
|
|
|
|
optimizer = teacher.initialize(get_examples=lambda: train_examples)
|
|
|
|
for i in range(50):
|
|
losses = {}
|
|
teacher.update(train_examples, sgd=optimizer, losses=losses)
|
|
assert losses["ner"] < 0.00001
|
|
|
|
student = English()
|
|
student_ner = student.add_pipe("ner")
|
|
student_ner.cfg["update_with_oracle_cut_size"] = max_moves
|
|
student_ner.initialize(
|
|
get_examples=lambda: train_examples, labels=teacher_ner.label_data
|
|
)
|
|
|
|
distill_examples = [
|
|
Example.from_dict(teacher.make_doc(t[0]), {}) for t in TRAIN_DATA
|
|
]
|
|
|
|
for i in range(100):
|
|
losses = {}
|
|
student_ner.distill(teacher_ner, distill_examples, sgd=optimizer, losses=losses)
|
|
assert losses["ner"] < 0.0001
|
|
|
|
# test the trained model
|
|
test_text = "I like London."
|
|
doc = student(test_text)
|
|
ents = doc.ents
|
|
assert len(ents) == 1
|
|
assert ents[0].text == "London"
|
|
assert ents[0].label_ == "LOC"
|
|
|
|
|
|
def test_beam_ner_scores():
|
|
# Test that we can get confidence values out of the beam_ner pipe
|
|
beam_width = 16
|
|
beam_density = 0.0001
|
|
nlp = English()
|
|
config = {
|
|
"beam_width": beam_width,
|
|
"beam_density": beam_density,
|
|
}
|
|
ner = nlp.add_pipe("beam_ner", config=config)
|
|
train_examples = []
|
|
for text, annotations in TRAIN_DATA:
|
|
train_examples.append(Example.from_dict(nlp.make_doc(text), annotations))
|
|
for ent in annotations.get("entities"):
|
|
ner.add_label(ent[2])
|
|
optimizer = nlp.initialize()
|
|
|
|
# update once
|
|
losses = {}
|
|
nlp.update(train_examples, sgd=optimizer, losses=losses)
|
|
|
|
# test the scores from the beam
|
|
test_text = "I like London."
|
|
doc = nlp.make_doc(test_text)
|
|
docs = [doc]
|
|
beams = ner.predict(docs)
|
|
entity_scores = ner.scored_ents(beams)[0]
|
|
|
|
for j in range(len(doc)):
|
|
for label in ner.labels:
|
|
score = entity_scores[(j, j + 1, label)]
|
|
eps = 0.00001
|
|
assert 0 - eps <= score <= 1 + eps
|
|
|
|
|
|
def test_beam_overfitting_IO(neg_key):
|
|
# Simple test to try and quickly overfit the Beam NER component
|
|
nlp = English()
|
|
beam_width = 16
|
|
beam_density = 0.0001
|
|
config = {
|
|
"beam_width": beam_width,
|
|
"beam_density": beam_density,
|
|
"incorrect_spans_key": neg_key,
|
|
}
|
|
ner = nlp.add_pipe("beam_ner", config=config)
|
|
train_examples = []
|
|
for text, annotations in TRAIN_DATA:
|
|
train_examples.append(Example.from_dict(nlp.make_doc(text), annotations))
|
|
for ent in annotations.get("entities"):
|
|
ner.add_label(ent[2])
|
|
optimizer = nlp.initialize()
|
|
|
|
# run overfitting
|
|
for i in range(50):
|
|
losses = {}
|
|
nlp.update(train_examples, sgd=optimizer, losses=losses)
|
|
assert losses["beam_ner"] < 0.0001
|
|
|
|
# test the scores from the beam
|
|
test_text = "I like London"
|
|
docs = [nlp.make_doc(test_text)]
|
|
beams = ner.predict(docs)
|
|
entity_scores = ner.scored_ents(beams)[0]
|
|
assert entity_scores[(2, 3, "LOC")] == 1.0
|
|
assert entity_scores[(2, 3, "PERSON")] == 0.0
|
|
assert len(nlp(test_text).ents) == 1
|
|
|
|
# Also test the results are still the same after IO
|
|
with make_tempdir() as tmp_dir:
|
|
nlp.to_disk(tmp_dir)
|
|
nlp2 = util.load_model_from_path(tmp_dir)
|
|
docs2 = [nlp2.make_doc(test_text)]
|
|
ner2 = nlp2.get_pipe("beam_ner")
|
|
beams2 = ner2.predict(docs2)
|
|
entity_scores2 = ner2.scored_ents(beams2)[0]
|
|
assert entity_scores2[(2, 3, "LOC")] == 1.0
|
|
assert entity_scores2[(2, 3, "PERSON")] == 0.0
|
|
|
|
# Try to unlearn the entity by using negative annotations
|
|
neg_doc = nlp.make_doc(test_text)
|
|
neg_ex = Example(neg_doc, neg_doc)
|
|
neg_ex.reference.spans[neg_key] = [Span(neg_doc, 2, 3, "LOC")]
|
|
neg_train_examples = [neg_ex]
|
|
|
|
for i in range(20):
|
|
losses = {}
|
|
nlp.update(neg_train_examples, sgd=optimizer, losses=losses)
|
|
|
|
# test the "untrained" model
|
|
assert len(nlp(test_text).ents) == 0
|
|
|
|
|
|
def test_neg_annotation(neg_key):
|
|
"""Check that the NER update works with a negative annotation that is a different label of the correct one,
|
|
or partly overlapping, etc"""
|
|
nlp = English()
|
|
beam_width = 16
|
|
beam_density = 0.0001
|
|
config = {
|
|
"beam_width": beam_width,
|
|
"beam_density": beam_density,
|
|
"incorrect_spans_key": neg_key,
|
|
}
|
|
ner = nlp.add_pipe("beam_ner", config=config)
|
|
train_text = "Who is Shaka Khan?"
|
|
neg_doc = nlp.make_doc(train_text)
|
|
ner.add_label("PERSON")
|
|
ner.add_label("ORG")
|
|
example = Example.from_dict(neg_doc, {"entities": [(7, 17, "PERSON")]})
|
|
example.reference.spans[neg_key] = [
|
|
Span(neg_doc, 2, 4, "ORG"),
|
|
Span(neg_doc, 2, 3, "PERSON"),
|
|
Span(neg_doc, 1, 4, "PERSON"),
|
|
]
|
|
|
|
optimizer = nlp.initialize()
|
|
for i in range(2):
|
|
losses = {}
|
|
nlp.update([example], sgd=optimizer, losses=losses)
|
|
|
|
|
|
def test_neg_annotation_conflict(neg_key):
|
|
# Check that NER raises for a negative annotation that is THE SAME as a correct one
|
|
nlp = English()
|
|
beam_width = 16
|
|
beam_density = 0.0001
|
|
config = {
|
|
"beam_width": beam_width,
|
|
"beam_density": beam_density,
|
|
"incorrect_spans_key": neg_key,
|
|
}
|
|
ner = nlp.add_pipe("beam_ner", config=config)
|
|
train_text = "Who is Shaka Khan?"
|
|
neg_doc = nlp.make_doc(train_text)
|
|
ner.add_label("PERSON")
|
|
ner.add_label("LOC")
|
|
example = Example.from_dict(neg_doc, {"entities": [(7, 17, "PERSON")]})
|
|
example.reference.spans[neg_key] = [Span(neg_doc, 2, 4, "PERSON")]
|
|
assert len(example.reference.ents) == 1
|
|
assert example.reference.ents[0].text == "Shaka Khan"
|
|
assert example.reference.ents[0].label_ == "PERSON"
|
|
assert len(example.reference.spans[neg_key]) == 1
|
|
assert example.reference.spans[neg_key][0].text == "Shaka Khan"
|
|
assert example.reference.spans[neg_key][0].label_ == "PERSON"
|
|
|
|
optimizer = nlp.initialize()
|
|
for i in range(2):
|
|
losses = {}
|
|
with pytest.raises(ValueError):
|
|
nlp.update([example], sgd=optimizer, losses=losses)
|
|
|
|
|
|
def test_beam_valid_parse(neg_key):
|
|
"""Regression test for previously flakey behaviour"""
|
|
nlp = English()
|
|
beam_width = 16
|
|
beam_density = 0.0001
|
|
config = {
|
|
"beam_width": beam_width,
|
|
"beam_density": beam_density,
|
|
"incorrect_spans_key": neg_key,
|
|
}
|
|
nlp.add_pipe("beam_ner", config=config)
|
|
# fmt: off
|
|
tokens = ['FEDERAL', 'NATIONAL', 'MORTGAGE', 'ASSOCIATION', '(', 'Fannie', 'Mae', '):', 'Posted', 'yields', 'on', '30', 'year', 'mortgage', 'commitments', 'for', 'delivery', 'within', '30', 'days', '(', 'priced', 'at', 'par', ')', '9.75', '%', ',', 'standard', 'conventional', 'fixed', '-', 'rate', 'mortgages', ';', '8.70', '%', ',', '6/2', 'rate', 'capped', 'one', '-', 'year', 'adjustable', 'rate', 'mortgages', '.', 'Source', ':', 'Telerate', 'Systems', 'Inc.']
|
|
iob = ['B-ORG', 'I-ORG', 'I-ORG', 'L-ORG', 'O', 'B-ORG', 'L-ORG', 'O', 'O', 'O', 'O', 'B-DATE', 'L-DATE', 'O', 'O', 'O', 'O', 'O', 'B-DATE', 'L-DATE', 'O', 'O', 'O', 'O', 'O', 'B-PERCENT', 'L-PERCENT', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-PERCENT', 'L-PERCENT', 'O', 'U-CARDINAL', 'O', 'O', 'B-DATE', 'I-DATE', 'L-DATE', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
|
|
# fmt: on
|
|
|
|
doc = Doc(nlp.vocab, words=tokens)
|
|
example = Example.from_dict(doc, {"ner": iob})
|
|
neg_span = Span(doc, 50, 53, "ORG")
|
|
example.reference.spans[neg_key] = [neg_span]
|
|
|
|
optimizer = nlp.initialize()
|
|
|
|
for i in range(5):
|
|
losses = {}
|
|
nlp.update([example], sgd=optimizer, losses=losses)
|
|
assert "beam_ner" in losses
|
|
|
|
|
|
def test_ner_warns_no_lookups(caplog):
|
|
nlp = English()
|
|
assert nlp.lang in util.LEXEME_NORM_LANGS
|
|
nlp.vocab.lookups = Lookups()
|
|
assert not len(nlp.vocab.lookups)
|
|
nlp.add_pipe("ner")
|
|
with caplog.at_level(logging.DEBUG):
|
|
nlp.initialize()
|
|
assert "W033" in caplog.text
|
|
caplog.clear()
|
|
nlp.vocab.lookups.add_table("lexeme_norm")
|
|
nlp.vocab.lookups.get_table("lexeme_norm")["a"] = "A"
|
|
with caplog.at_level(logging.DEBUG):
|
|
nlp.initialize()
|
|
assert "W033" not in caplog.text
|
|
|
|
|
|
@Language.factory("blocker")
|
|
class BlockerComponent1:
|
|
def __init__(self, nlp, start, end, name="my_blocker"):
|
|
self.start = start
|
|
self.end = end
|
|
self.name = name
|
|
|
|
def __call__(self, doc):
|
|
doc.set_ents([], blocked=[doc[self.start : self.end]], default="unmodified")
|
|
return doc
|