import logging
import random

import pytest
from numpy.testing import assert_equal

from spacy import registry, util
from spacy.attrs import ENT_IOB
from spacy.lang.en import English
from spacy.lang.it import Italian
from spacy.language import Language
from spacy.lookups import Lookups
from spacy.pipeline import EntityRecognizer
from spacy.pipeline._parser_internals.ner import BiluoPushDown
from spacy.pipeline.ner import DEFAULT_NER_MODEL
from spacy.tokens import Doc, Span
from spacy.training import Example, iob_to_biluo, split_bilu_label
from spacy.vocab import Vocab

from ..util import make_tempdir

TRAIN_DATA = [
    ("Who is Shaka Khan?", {"entities": [(7, 17, "PERSON")]}),
    ("I like London and Berlin.", {"entities": [(7, 13, "LOC"), (18, 24, "LOC")]}),
]


@pytest.fixture
def neg_key():
    return "non_entities"


@pytest.fixture
def vocab():
    return Vocab()


@pytest.fixture
def doc(vocab):
    return Doc(vocab, words=["Casey", "went", "to", "New", "York", "."])


@pytest.fixture
def entity_annots(doc):
    casey = doc[0:1]
    ny = doc[3:5]
    return [
        (casey.start_char, casey.end_char, "PERSON"),
        (ny.start_char, ny.end_char, "GPE"),
    ]


@pytest.fixture
def entity_types(entity_annots):
    return sorted(set([label for (s, e, label) in entity_annots]))


@pytest.fixture
def tsys(vocab, entity_types):
    actions = BiluoPushDown.get_actions(entity_types=entity_types)
    return BiluoPushDown(vocab.strings, actions)


@pytest.mark.parametrize("label", ["U-JOB-NAME"])
@pytest.mark.issue(1967)
def test_issue1967(label):
    nlp = Language()
    config = {}
    ner = nlp.create_pipe("ner", config=config)
    example = Example.from_dict(
        Doc(ner.vocab, words=["word"]),
        {
            "ids": [0],
            "words": ["word"],
            "tags": ["tag"],
            "heads": [0],
            "deps": ["dep"],
            "entities": [label],
        },
    )
    assert "JOB-NAME" in ner.moves.get_actions(examples=[example])[1]


@pytest.mark.issue(2179)
def test_issue2179():
    """Test that spurious 'extra_labels' aren't created when initializing NER."""
    nlp = Italian()
    ner = nlp.add_pipe("ner")
    ner.add_label("CITIZENSHIP")
    nlp.initialize()
    nlp2 = Italian()
    nlp2.add_pipe("ner")
    assert len(nlp2.get_pipe("ner").labels) == 0
    model = nlp2.get_pipe("ner").model
    model.attrs["resize_output"](model, nlp.get_pipe("ner").moves.n_moves)
    nlp2.from_bytes(nlp.to_bytes())
    assert "extra_labels" not in nlp2.get_pipe("ner").cfg
    assert nlp2.get_pipe("ner").labels == ("CITIZENSHIP",)


@pytest.mark.issue(2385)
def test_issue2385():
    """Test that IOB tags are correctly converted to BILUO tags."""
    # fix bug in labels with a 'b' character
    tags1 = ("B-BRAWLER", "I-BRAWLER", "I-BRAWLER")
    assert iob_to_biluo(tags1) == ["B-BRAWLER", "I-BRAWLER", "L-BRAWLER"]
    # maintain support for iob1 format
    tags2 = ("I-ORG", "I-ORG", "B-ORG")
    assert iob_to_biluo(tags2) == ["B-ORG", "L-ORG", "U-ORG"]
    # maintain support for iob2 format
    tags3 = ("B-PERSON", "I-PERSON", "B-PERSON")
    assert iob_to_biluo(tags3) == ["B-PERSON", "L-PERSON", "U-PERSON"]
    # ensure it works with hyphens in the name
    tags4 = ("B-MULTI-PERSON", "I-MULTI-PERSON", "B-MULTI-PERSON")
    assert iob_to_biluo(tags4) == ["B-MULTI-PERSON", "L-MULTI-PERSON", "U-MULTI-PERSON"]


@pytest.mark.issue(2800)
def test_issue2800():
    """Test issue that arises when too many labels are added to NER model.
    Used to cause segfault.
    """
    nlp = English()
    train_data = []
    train_data.extend(
        [Example.from_dict(nlp.make_doc("One sentence"), {"entities": []})]
    )
    entity_types = [str(i) for i in range(1000)]
    ner = nlp.add_pipe("ner")
    for entity_type in list(entity_types):
        ner.add_label(entity_type)
    optimizer = nlp.initialize()
    for i in range(20):
        losses = {}
        random.shuffle(train_data)
        for example in train_data:
            nlp.update([example], sgd=optimizer, losses=losses, drop=0.5)


@pytest.mark.issue(3209)
def test_issue3209():
    """Test issue that occurred in spaCy nightly where NER labels were being
    mapped to classes incorrectly after loading the model, when the labels
    were added using ner.add_label().
    """
    nlp = English()
    ner = nlp.add_pipe("ner")
    ner.add_label("ANIMAL")
    nlp.initialize()
    move_names = ["O", "B-ANIMAL", "I-ANIMAL", "L-ANIMAL", "U-ANIMAL"]
    assert ner.move_names == move_names
    nlp2 = English()
    ner2 = nlp2.add_pipe("ner")
    model = ner2.model
    model.attrs["resize_output"](model, ner.moves.n_moves)
    nlp2.from_bytes(nlp.to_bytes())
    assert ner2.move_names == move_names


def test_labels_from_BILUO():
    """Test that labels are inferred correctly when there's a - in label."""
    nlp = English()
    ner = nlp.add_pipe("ner")
    ner.add_label("LARGE-ANIMAL")
    nlp.initialize()
    move_names = [
        "O",
        "B-LARGE-ANIMAL",
        "I-LARGE-ANIMAL",
        "L-LARGE-ANIMAL",
        "U-LARGE-ANIMAL",
    ]
    labels = {"LARGE-ANIMAL"}
    assert ner.move_names == move_names
    assert set(ner.labels) == labels


@pytest.mark.issue(4267)
def test_issue4267():
    """Test that running an entity_ruler after ner gives consistent results"""
    nlp = English()
    ner = nlp.add_pipe("ner")
    ner.add_label("PEOPLE")
    nlp.initialize()
    assert "ner" in nlp.pipe_names
    # assert that we have correct IOB annotations
    doc1 = nlp("hi")
    assert doc1.has_annotation("ENT_IOB")
    for token in doc1:
        assert token.ent_iob == 2
    # add entity ruler and run again
    patterns = [{"label": "SOFTWARE", "pattern": "spacy"}]
    ruler = nlp.add_pipe("entity_ruler")
    ruler.add_patterns(patterns)
    assert "entity_ruler" in nlp.pipe_names
    assert "ner" in nlp.pipe_names
    # assert that we still have correct IOB annotations
    doc2 = nlp("hi")
    assert doc2.has_annotation("ENT_IOB")
    for token in doc2:
        assert token.ent_iob == 2


@pytest.mark.issue(4313)
def test_issue4313():
    """This should not crash or exit with some strange error code"""
    beam_width = 16
    beam_density = 0.0001
    nlp = English()
    config = {
        "beam_width": beam_width,
        "beam_density": beam_density,
    }
    ner = nlp.add_pipe("beam_ner", config=config)
    ner.add_label("SOME_LABEL")
    nlp.initialize()
    # add a new label to the doc
    doc = nlp("What do you think about Apple ?")
    assert len(ner.labels) == 1
    assert "SOME_LABEL" in ner.labels
    apple_ent = Span(doc, 5, 6, label="MY_ORG")
    doc.ents = list(doc.ents) + [apple_ent]

    # ensure the beam_parse still works with the new label
    docs = [doc]
    ner.beam_parse(docs, drop=0.0, beam_width=beam_width, beam_density=beam_density)
    assert len(ner.labels) == 2
    assert "MY_ORG" in ner.labels


def test_get_oracle_moves(tsys, doc, entity_annots):
    example = Example.from_dict(doc, {"entities": entity_annots})
    act_classes = tsys.get_oracle_sequence(example, _debug=False)
    names = [tsys.get_class_name(act) for act in act_classes]
    assert names == ["U-PERSON", "O", "O", "B-GPE", "L-GPE", "O"]


def test_negative_samples_two_word_input(tsys, vocab, neg_key):
    """Test that we don't get stuck in a two word input when we have a negative
    span. This could happen if we don't have the right check on the B action.
    """
    tsys.cfg["neg_key"] = neg_key
    doc = Doc(vocab, words=["A", "B"])
    entity_annots = [None, None]
    example = Example.from_dict(doc, {"entities": entity_annots})
    # These mean that the oracle sequence shouldn't have O for the first
    # word, and it shouldn't analyse it as B-PERSON, L-PERSON
    example.y.spans[neg_key] = [
        Span(example.y, 0, 1, label="O"),
        Span(example.y, 0, 2, label="PERSON"),
    ]
    act_classes = tsys.get_oracle_sequence(example)
    names = [tsys.get_class_name(act) for act in act_classes]
    assert names
    assert names[0] != "O"
    assert names[0] != "B-PERSON"
    assert names[1] != "L-PERSON"


def test_negative_samples_three_word_input(tsys, vocab, neg_key):
    """Test that we exclude a 2-word entity correctly using a negative example."""
    tsys.cfg["neg_key"] = neg_key
    doc = Doc(vocab, words=["A", "B", "C"])
    entity_annots = [None, None, None]
    example = Example.from_dict(doc, {"entities": entity_annots})
    # These mean that the oracle sequence shouldn't have O for the first
    # word, and it shouldn't analyse it as B-PERSON, L-PERSON
    example.y.spans[neg_key] = [
        Span(example.y, 0, 1, label="O"),
        Span(example.y, 0, 2, label="PERSON"),
    ]
    act_classes = tsys.get_oracle_sequence(example)
    names = [tsys.get_class_name(act) for act in act_classes]
    assert names
    assert names[0] != "O"
    assert names[1] != "B-PERSON"


def test_negative_samples_U_entity(tsys, vocab, neg_key):
    """Test that we exclude a 2-word entity correctly using a negative example."""
    tsys.cfg["neg_key"] = neg_key
    doc = Doc(vocab, words=["A"])
    entity_annots = [None]
    example = Example.from_dict(doc, {"entities": entity_annots})
    # These mean that the oracle sequence shouldn't have O for the first
    # word, and it shouldn't analyse it as B-PERSON, L-PERSON
    example.y.spans[neg_key] = [
        Span(example.y, 0, 1, label="O"),
        Span(example.y, 0, 1, label="PERSON"),
    ]
    act_classes = tsys.get_oracle_sequence(example)
    names = [tsys.get_class_name(act) for act in act_classes]
    assert names
    assert names[0] != "O"
    assert names[0] != "U-PERSON"


def test_negative_sample_key_is_in_config(vocab, entity_types):
    actions = BiluoPushDown.get_actions(entity_types=entity_types)
    tsys = BiluoPushDown(vocab.strings, actions, incorrect_spans_key="non_entities")
    assert tsys.cfg["neg_key"] == "non_entities"


# We can't easily represent this on a Doc object. Not sure what the best solution
# would be, but I don't think it's an important use case?
@pytest.mark.skip(reason="No longer supported")
def test_oracle_moves_missing_B(en_vocab):
    words = ["B", "52", "Bomber"]
    biluo_tags = [None, None, "L-PRODUCT"]

    doc = Doc(en_vocab, words=words)
    example = Example.from_dict(doc, {"words": words, "entities": biluo_tags})

    moves = BiluoPushDown(en_vocab.strings)
    move_types = ("M", "B", "I", "L", "U", "O")
    for tag in biluo_tags:
        if tag is None:
            continue
        elif tag == "O":
            moves.add_action(move_types.index("O"), "")
        else:
            action, label = split_bilu_label(tag)
            moves.add_action(move_types.index("B"), label)
            moves.add_action(move_types.index("I"), label)
            moves.add_action(move_types.index("L"), label)
            moves.add_action(move_types.index("U"), label)
    moves.get_oracle_sequence(example)


# We can't easily represent this on a Doc object. Not sure what the best solution
# would be, but I don't think it's an important use case?
@pytest.mark.skip(reason="No longer supported")
def test_oracle_moves_whitespace(en_vocab):
    words = ["production", "\n", "of", "Northrop", "\n", "Corp.", "\n", "'s", "radar"]
    biluo_tags = ["O", "O", "O", "B-ORG", None, "I-ORG", "L-ORG", "O", "O"]

    doc = Doc(en_vocab, words=words)
    example = Example.from_dict(doc, {"entities": biluo_tags})

    moves = BiluoPushDown(en_vocab.strings)
    move_types = ("M", "B", "I", "L", "U", "O")
    for tag in biluo_tags:
        if tag is None:
            continue
        elif tag == "O":
            moves.add_action(move_types.index("O"), "")
        else:
            action, label = split_bilu_label(tag)
            moves.add_action(move_types.index(action), label)
    moves.get_oracle_sequence(example)


def test_accept_blocked_token():
    """Test succesful blocking of tokens to be in an entity."""
    # 1. test normal behaviour
    nlp1 = English()
    doc1 = nlp1("I live in New York")
    config = {}
    ner1 = nlp1.create_pipe("ner", config=config)
    assert [token.ent_iob_ for token in doc1] == ["", "", "", "", ""]
    assert [token.ent_type_ for token in doc1] == ["", "", "", "", ""]

    # Add the OUT action
    ner1.moves.add_action(5, "")
    ner1.add_label("GPE")
    # Get into the state just before "New"
    state1 = ner1.moves.init_batch([doc1])[0]
    ner1.moves.apply_transition(state1, "O")
    ner1.moves.apply_transition(state1, "O")
    ner1.moves.apply_transition(state1, "O")
    # Check that B-GPE is valid.
    assert ner1.moves.is_valid(state1, "B-GPE")

    # 2. test blocking behaviour
    nlp2 = English()
    doc2 = nlp2("I live in New York")
    config = {}
    ner2 = nlp2.create_pipe("ner", config=config)

    # set "New York" to a blocked entity
    doc2.set_ents([], blocked=[doc2[3:5]], default="unmodified")
    assert [token.ent_iob_ for token in doc2] == ["", "", "", "B", "B"]
    assert [token.ent_type_ for token in doc2] == ["", "", "", "", ""]

    # Check that B-GPE is now invalid.
    ner2.moves.add_action(4, "")
    ner2.moves.add_action(5, "")
    ner2.add_label("GPE")
    state2 = ner2.moves.init_batch([doc2])[0]
    ner2.moves.apply_transition(state2, "O")
    ner2.moves.apply_transition(state2, "O")
    ner2.moves.apply_transition(state2, "O")
    # we can only use U- for "New"
    assert not ner2.moves.is_valid(state2, "B-GPE")
    assert ner2.moves.is_valid(state2, "U-")
    ner2.moves.apply_transition(state2, "U-")
    # we can only use U- for "York"
    assert not ner2.moves.is_valid(state2, "B-GPE")
    assert ner2.moves.is_valid(state2, "U-")


def test_train_empty():
    """Test that training an empty text does not throw errors."""
    train_data = [
        ("Who is Shaka Khan?", {"entities": [(7, 17, "PERSON")]}),
        ("", {"entities": []}),
    ]

    nlp = English()
    train_examples = []
    for t in train_data:
        train_examples.append(Example.from_dict(nlp.make_doc(t[0]), t[1]))
    ner = nlp.add_pipe("ner", last=True)
    ner.add_label("PERSON")
    nlp.initialize()
    for itn in range(2):
        losses = {}
        batches = util.minibatch(train_examples, size=8)
        for batch in batches:
            nlp.update(batch, losses=losses)


def test_train_negative_deprecated():
    """Test that the deprecated negative entity format raises a custom error."""
    train_data = [
        ("Who is Shaka Khan?", {"entities": [(7, 17, "!PERSON")]}),
    ]

    nlp = English()
    train_examples = []
    for t in train_data:
        train_examples.append(Example.from_dict(nlp.make_doc(t[0]), t[1]))
    ner = nlp.add_pipe("ner", last=True)
    ner.add_label("PERSON")
    nlp.initialize()
    for itn in range(2):
        losses = {}
        batches = util.minibatch(train_examples, size=8)
        for batch in batches:
            with pytest.raises(ValueError):
                nlp.update(batch, losses=losses)


def test_overwrite_token():
    nlp = English()
    nlp.add_pipe("ner")
    nlp.initialize()
    # The untrained NER will predict O for each token
    doc = nlp("I live in New York")
    assert [token.ent_iob_ for token in doc] == ["O", "O", "O", "O", "O"]
    assert [token.ent_type_ for token in doc] == ["", "", "", "", ""]
    # Check that a new ner can overwrite O
    config = {}
    ner2 = nlp.create_pipe("ner", config=config)
    ner2.moves.add_action(5, "")
    ner2.add_label("GPE")
    state = ner2.moves.init_batch([doc])[0]
    assert ner2.moves.is_valid(state, "B-GPE")
    assert ner2.moves.is_valid(state, "U-GPE")
    ner2.moves.apply_transition(state, "B-GPE")
    assert ner2.moves.is_valid(state, "I-GPE")
    assert ner2.moves.is_valid(state, "L-GPE")


def test_empty_ner():
    nlp = English()
    ner = nlp.add_pipe("ner")
    ner.add_label("MY_LABEL")
    nlp.initialize()
    doc = nlp("John is watching the news about Croatia's elections")
    # if this goes wrong, the initialization of the parser's upper layer is probably broken
    result = ["O", "O", "O", "O", "O", "O", "O", "O", "O"]
    assert [token.ent_iob_ for token in doc] == result


def test_ruler_before_ner():
    """Test that an NER works after an entity_ruler: the second can add annotations"""
    nlp = English()

    # 1 : Entity Ruler - should set "this" to B and everything else to empty
    patterns = [{"label": "THING", "pattern": "This"}]
    ruler = nlp.add_pipe("entity_ruler")

    # 2: untrained NER - should set everything else to O
    untrained_ner = nlp.add_pipe("ner")
    untrained_ner.add_label("MY_LABEL")
    nlp.initialize()
    ruler.add_patterns(patterns)
    doc = nlp("This is Antti Korhonen speaking in Finland")
    expected_iobs = ["B", "O", "O", "O", "O", "O", "O"]
    expected_types = ["THING", "", "", "", "", "", ""]
    assert [token.ent_iob_ for token in doc] == expected_iobs
    assert [token.ent_type_ for token in doc] == expected_types


def test_ner_constructor(en_vocab):
    config = {
        "update_with_oracle_cut_size": 100,
    }
    cfg = {"model": DEFAULT_NER_MODEL}
    model = registry.resolve(cfg, validate=True)["model"]
    EntityRecognizer(en_vocab, model, **config)
    EntityRecognizer(en_vocab, model)


def test_ner_before_ruler():
    """Test that an entity_ruler works after an NER: the second can overwrite O annotations"""
    nlp = English()

    # 1: untrained NER - should set everything to O
    untrained_ner = nlp.add_pipe("ner", name="uner")
    untrained_ner.add_label("MY_LABEL")
    nlp.initialize()

    # 2 : Entity Ruler - should set "this" to B and keep everything else O
    patterns = [{"label": "THING", "pattern": "This"}]
    ruler = nlp.add_pipe("entity_ruler")
    ruler.add_patterns(patterns)

    doc = nlp("This is Antti Korhonen speaking in Finland")
    expected_iobs = ["B", "O", "O", "O", "O", "O", "O"]
    expected_types = ["THING", "", "", "", "", "", ""]
    assert [token.ent_iob_ for token in doc] == expected_iobs
    assert [token.ent_type_ for token in doc] == expected_types


def test_block_ner():
    """Test functionality for blocking tokens so they can't be in a named entity"""
    # block "Antti L Korhonen" from being a named entity
    nlp = English()
    nlp.add_pipe("blocker", config={"start": 2, "end": 5})
    untrained_ner = nlp.add_pipe("ner")
    untrained_ner.add_label("MY_LABEL")
    nlp.initialize()
    doc = nlp("This is Antti L Korhonen speaking in Finland")
    expected_iobs = ["O", "O", "B", "B", "B", "O", "O", "O"]
    expected_types = ["", "", "", "", "", "", "", ""]
    assert [token.ent_iob_ for token in doc] == expected_iobs
    assert [token.ent_type_ for token in doc] == expected_types


@pytest.mark.parametrize("use_upper", [True, False])
def test_overfitting_IO(use_upper):
    # Simple test to try and quickly overfit the NER component
    nlp = English()
    ner = nlp.add_pipe("ner", config={"model": {"use_upper": use_upper}})
    train_examples = []
    for text, annotations in TRAIN_DATA:
        train_examples.append(Example.from_dict(nlp.make_doc(text), annotations))
        for ent in annotations.get("entities"):
            ner.add_label(ent[2])
    optimizer = nlp.initialize()

    for i in range(50):
        losses = {}
        nlp.update(train_examples, sgd=optimizer, losses=losses)
    assert losses["ner"] < 0.00001

    # test the trained model
    test_text = "I like London."
    doc = nlp(test_text)
    ents = doc.ents
    assert len(ents) == 1
    assert ents[0].text == "London"
    assert ents[0].label_ == "LOC"

    # Also test the results are still the same after IO
    with make_tempdir() as tmp_dir:
        nlp.to_disk(tmp_dir)
        nlp2 = util.load_model_from_path(tmp_dir)
        doc2 = nlp2(test_text)
        ents2 = doc2.ents
        assert len(ents2) == 1
        assert ents2[0].text == "London"
        assert ents2[0].label_ == "LOC"
        # Ensure that the predictions are still the same, even after adding a new label
        ner2 = nlp2.get_pipe("ner")
        assert ner2.model.attrs["has_upper"] == use_upper
        ner2.add_label("RANDOM_NEW_LABEL")
        doc3 = nlp2(test_text)
        ents3 = doc3.ents
        assert len(ents3) == 1
        assert ents3[0].text == "London"
        assert ents3[0].label_ == "LOC"

    # Make sure that running pipe twice, or comparing to call, always amounts to the same predictions
    texts = [
        "Just a sentence.",
        "Then one more sentence about London.",
        "Here is another one.",
        "I like London.",
    ]
    batch_deps_1 = [doc.to_array([ENT_IOB]) for doc in nlp.pipe(texts)]
    batch_deps_2 = [doc.to_array([ENT_IOB]) for doc in nlp.pipe(texts)]
    no_batch_deps = [doc.to_array([ENT_IOB]) for doc in [nlp(text) for text in texts]]
    assert_equal(batch_deps_1, batch_deps_2)
    assert_equal(batch_deps_1, no_batch_deps)

    # test that kb_id is preserved
    test_text = "I like London and London."
    doc = nlp.make_doc(test_text)
    doc.ents = [Span(doc, 2, 3, label="LOC", kb_id=1234)]
    ents = doc.ents
    assert len(ents) == 1
    assert ents[0].text == "London"
    assert ents[0].label_ == "LOC"
    assert ents[0].kb_id == 1234
    doc = nlp.get_pipe("ner")(doc)
    ents = doc.ents
    assert len(ents) == 2
    assert ents[0].text == "London"
    assert ents[0].label_ == "LOC"
    assert ents[0].kb_id == 1234
    # ent added by ner has kb_id == 0
    assert ents[1].text == "London"
    assert ents[1].label_ == "LOC"
    assert ents[1].kb_id == 0


def test_beam_ner_scores():
    # Test that we can get confidence values out of the beam_ner pipe
    beam_width = 16
    beam_density = 0.0001
    nlp = English()
    config = {
        "beam_width": beam_width,
        "beam_density": beam_density,
    }
    ner = nlp.add_pipe("beam_ner", config=config)
    train_examples = []
    for text, annotations in TRAIN_DATA:
        train_examples.append(Example.from_dict(nlp.make_doc(text), annotations))
        for ent in annotations.get("entities"):
            ner.add_label(ent[2])
    optimizer = nlp.initialize()

    # update once
    losses = {}
    nlp.update(train_examples, sgd=optimizer, losses=losses)

    # test the scores from the beam
    test_text = "I like London."
    doc = nlp.make_doc(test_text)
    docs = [doc]
    beams = ner.predict(docs)
    entity_scores = ner.scored_ents(beams)[0]

    for j in range(len(doc)):
        for label in ner.labels:
            score = entity_scores[(j, j + 1, label)]
            eps = 0.00001
            assert 0 - eps <= score <= 1 + eps


def test_beam_overfitting_IO(neg_key):
    # Simple test to try and quickly overfit the Beam NER component
    nlp = English()
    beam_width = 16
    beam_density = 0.0001
    config = {
        "beam_width": beam_width,
        "beam_density": beam_density,
        "incorrect_spans_key": neg_key,
    }
    ner = nlp.add_pipe("beam_ner", config=config)
    train_examples = []
    for text, annotations in TRAIN_DATA:
        train_examples.append(Example.from_dict(nlp.make_doc(text), annotations))
        for ent in annotations.get("entities"):
            ner.add_label(ent[2])
    optimizer = nlp.initialize()

    # run overfitting
    for i in range(50):
        losses = {}
        nlp.update(train_examples, sgd=optimizer, losses=losses)
    assert losses["beam_ner"] < 0.0001

    # test the scores from the beam
    test_text = "I like London"
    docs = [nlp.make_doc(test_text)]
    beams = ner.predict(docs)
    entity_scores = ner.scored_ents(beams)[0]
    assert entity_scores[(2, 3, "LOC")] == 1.0
    assert entity_scores[(2, 3, "PERSON")] == 0.0
    assert len(nlp(test_text).ents) == 1

    # Also test the results are still the same after IO
    with make_tempdir() as tmp_dir:
        nlp.to_disk(tmp_dir)
        nlp2 = util.load_model_from_path(tmp_dir)
        docs2 = [nlp2.make_doc(test_text)]
        ner2 = nlp2.get_pipe("beam_ner")
        beams2 = ner2.predict(docs2)
        entity_scores2 = ner2.scored_ents(beams2)[0]
        assert entity_scores2[(2, 3, "LOC")] == 1.0
        assert entity_scores2[(2, 3, "PERSON")] == 0.0

    # Try to unlearn the entity by using negative annotations
    neg_doc = nlp.make_doc(test_text)
    neg_ex = Example(neg_doc, neg_doc)
    neg_ex.reference.spans[neg_key] = [Span(neg_doc, 2, 3, "LOC")]
    neg_train_examples = [neg_ex]

    for i in range(20):
        losses = {}
        nlp.update(neg_train_examples, sgd=optimizer, losses=losses)

    # test the "untrained" model
    assert len(nlp(test_text).ents) == 0


def test_neg_annotation(neg_key):
    """Check that the NER update works with a negative annotation that is a different label of the correct one,
    or partly overlapping, etc"""
    nlp = English()
    beam_width = 16
    beam_density = 0.0001
    config = {
        "beam_width": beam_width,
        "beam_density": beam_density,
        "incorrect_spans_key": neg_key,
    }
    ner = nlp.add_pipe("beam_ner", config=config)
    train_text = "Who is Shaka Khan?"
    neg_doc = nlp.make_doc(train_text)
    ner.add_label("PERSON")
    ner.add_label("ORG")
    example = Example.from_dict(neg_doc, {"entities": [(7, 17, "PERSON")]})
    example.reference.spans[neg_key] = [
        Span(example.reference, 2, 4, "ORG"),
        Span(example.reference, 2, 3, "PERSON"),
        Span(example.reference, 1, 4, "PERSON"),
    ]

    optimizer = nlp.initialize()
    for i in range(2):
        losses = {}
        nlp.update([example], sgd=optimizer, losses=losses)


def test_neg_annotation_conflict(neg_key):
    # Check that NER raises for a negative annotation that is THE SAME as a correct one
    nlp = English()
    beam_width = 16
    beam_density = 0.0001
    config = {
        "beam_width": beam_width,
        "beam_density": beam_density,
        "incorrect_spans_key": neg_key,
    }
    ner = nlp.add_pipe("beam_ner", config=config)
    train_text = "Who is Shaka Khan?"
    neg_doc = nlp.make_doc(train_text)
    ner.add_label("PERSON")
    ner.add_label("LOC")
    example = Example.from_dict(neg_doc, {"entities": [(7, 17, "PERSON")]})
    example.reference.spans[neg_key] = [Span(example.reference, 2, 4, "PERSON")]
    assert len(example.reference.ents) == 1
    assert example.reference.ents[0].text == "Shaka Khan"
    assert example.reference.ents[0].label_ == "PERSON"
    assert len(example.reference.spans[neg_key]) == 1
    assert example.reference.spans[neg_key][0].text == "Shaka Khan"
    assert example.reference.spans[neg_key][0].label_ == "PERSON"

    optimizer = nlp.initialize()
    for i in range(2):
        losses = {}
        with pytest.raises(ValueError):
            nlp.update([example], sgd=optimizer, losses=losses)


def test_beam_valid_parse(neg_key):
    """Regression test for previously flakey behaviour"""
    nlp = English()
    beam_width = 16
    beam_density = 0.0001
    config = {
        "beam_width": beam_width,
        "beam_density": beam_density,
        "incorrect_spans_key": neg_key,
    }
    nlp.add_pipe("beam_ner", config=config)
    # fmt: off
    tokens = ['FEDERAL', 'NATIONAL', 'MORTGAGE', 'ASSOCIATION', '(', 'Fannie', 'Mae', '):', 'Posted', 'yields', 'on', '30', 'year', 'mortgage', 'commitments', 'for', 'delivery', 'within', '30', 'days', '(', 'priced', 'at', 'par', ')', '9.75', '%', ',', 'standard', 'conventional', 'fixed', '-', 'rate', 'mortgages', ';', '8.70', '%', ',', '6/2', 'rate', 'capped', 'one', '-', 'year', 'adjustable', 'rate', 'mortgages', '.', 'Source', ':', 'Telerate', 'Systems', 'Inc.']
    iob = ['B-ORG', 'I-ORG', 'I-ORG', 'L-ORG', 'O', 'B-ORG', 'L-ORG', 'O', 'O', 'O', 'O', 'B-DATE', 'L-DATE', 'O', 'O', 'O', 'O', 'O', 'B-DATE', 'L-DATE', 'O', 'O', 'O', 'O', 'O', 'B-PERCENT', 'L-PERCENT', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-PERCENT', 'L-PERCENT', 'O', 'U-CARDINAL', 'O', 'O', 'B-DATE', 'I-DATE', 'L-DATE', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
    # fmt: on

    doc = Doc(nlp.vocab, words=tokens)
    example = Example.from_dict(doc, {"ner": iob})
    neg_span = Span(example.reference, 50, 53, "ORG")
    example.reference.spans[neg_key] = [neg_span]

    optimizer = nlp.initialize()

    for i in range(5):
        losses = {}
        nlp.update([example], sgd=optimizer, losses=losses)
    assert "beam_ner" in losses


def test_ner_warns_no_lookups(caplog):
    nlp = English()
    assert nlp.lang in util.LEXEME_NORM_LANGS
    nlp.vocab.lookups = Lookups()
    assert not len(nlp.vocab.lookups)
    nlp.add_pipe("ner")
    with caplog.at_level(logging.DEBUG):
        nlp.initialize()
        assert "W033" in caplog.text
    caplog.clear()
    nlp.vocab.lookups.add_table("lexeme_norm")
    nlp.vocab.lookups.get_table("lexeme_norm")["a"] = "A"
    with caplog.at_level(logging.DEBUG):
        nlp.initialize()
        assert "W033" not in caplog.text


@Language.factory("blocker")
class BlockerComponent1:
    def __init__(self, nlp, start, end, name="my_blocker"):
        self.start = start
        self.end = end
        self.name = name

    def __call__(self, doc):
        doc.set_ents([], blocked=[doc[self.start : self.end]], default="unmodified")
        return doc