mirror of
https://github.com/explosion/spaCy.git
synced 2025-07-10 16:22:29 +03:00
span finder integrated into spacy from experimental
This commit is contained in:
parent
8d4129e177
commit
638ac9f666
|
@ -2,6 +2,7 @@ from .entity_linker import * # noqa
|
||||||
from .multi_task import * # noqa
|
from .multi_task import * # noqa
|
||||||
from .parser import * # noqa
|
from .parser import * # noqa
|
||||||
from .spancat import * # noqa
|
from .spancat import * # noqa
|
||||||
|
from .span_finder import * # noqa
|
||||||
from .tagger import * # noqa
|
from .tagger import * # noqa
|
||||||
from .textcat import * # noqa
|
from .textcat import * # noqa
|
||||||
from .tok2vec import * # noqa
|
from .tok2vec import * # noqa
|
||||||
|
|
41
spacy/ml/models/span_finder.py
Normal file
41
spacy/ml/models/span_finder.py
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
from typing import Callable, List, Tuple
|
||||||
|
from thinc.api import Model, chain, with_array
|
||||||
|
from thinc.types import Floats2d, Floats1d
|
||||||
|
from spacy.tokens import Doc
|
||||||
|
from ...util import registry
|
||||||
|
|
||||||
|
|
||||||
|
InT = List[Doc]
|
||||||
|
OutT = Floats2d
|
||||||
|
|
||||||
|
|
||||||
|
@registry.architectures("spacy.SpanFinder.v1")
|
||||||
|
def build_finder_model(
|
||||||
|
tok2vec: Model[InT, List[Floats2d]], scorer: Model[OutT, OutT]
|
||||||
|
) -> Model[InT, OutT]:
|
||||||
|
|
||||||
|
logistic_layer: Model[List[Floats2d], List[Floats2d]] = with_array(scorer)
|
||||||
|
model: Model[InT, OutT] = chain(tok2vec, logistic_layer, flattener())
|
||||||
|
model.set_ref("tok2vec", tok2vec)
|
||||||
|
# XXX Why do we have reference for both the scorer and it being wrapped in with_array?
|
||||||
|
model.set_ref("scorer", scorer)
|
||||||
|
model.set_ref("logistic_layer", logistic_layer)
|
||||||
|
|
||||||
|
return model
|
||||||
|
|
||||||
|
|
||||||
|
def flattener() -> Model[List[Floats2d], Floats2d]:
|
||||||
|
"""Flattens the input to a 1-dimensional list of scores"""
|
||||||
|
|
||||||
|
def forward(
|
||||||
|
model: Model[Floats1d, Floats1d], X: List[Floats2d], is_train: bool
|
||||||
|
) -> Tuple[Floats2d, Callable[[Floats2d], List[Floats2d]]]:
|
||||||
|
lens = model.ops.asarray1i([len(doc) for doc in X])
|
||||||
|
Y = model.ops.flatten(X)
|
||||||
|
|
||||||
|
def backprop(dY: Floats2d) -> List[Floats2d]:
|
||||||
|
return model.ops.unflatten(dY, lens)
|
||||||
|
|
||||||
|
return Y, backprop
|
||||||
|
|
||||||
|
return Model("Flattener", forward=forward)
|
|
@ -14,6 +14,7 @@ from .tagger import Tagger
|
||||||
from .textcat import TextCategorizer
|
from .textcat import TextCategorizer
|
||||||
from .spancat import SpanCategorizer
|
from .spancat import SpanCategorizer
|
||||||
from .span_ruler import SpanRuler
|
from .span_ruler import SpanRuler
|
||||||
|
from .span_finder import SpanFinder
|
||||||
from .textcat_multilabel import MultiLabel_TextCategorizer
|
from .textcat_multilabel import MultiLabel_TextCategorizer
|
||||||
from .tok2vec import Tok2Vec
|
from .tok2vec import Tok2Vec
|
||||||
from .functions import merge_entities, merge_noun_chunks, merge_subtokens
|
from .functions import merge_entities, merge_noun_chunks, merge_subtokens
|
||||||
|
@ -32,6 +33,7 @@ __all__ = [
|
||||||
"Sentencizer",
|
"Sentencizer",
|
||||||
"SpanCategorizer",
|
"SpanCategorizer",
|
||||||
"SpanRuler",
|
"SpanRuler",
|
||||||
|
"SpanFinder",
|
||||||
"Tagger",
|
"Tagger",
|
||||||
"TextCategorizer",
|
"TextCategorizer",
|
||||||
"Tok2Vec",
|
"Tok2Vec",
|
||||||
|
|
387
spacy/pipeline/span_finder.py
Normal file
387
spacy/pipeline/span_finder.py
Normal file
|
@ -0,0 +1,387 @@
|
||||||
|
from typing import List, Dict, Callable, Tuple, Optional, Iterable, Any, cast
|
||||||
|
from functools import partial
|
||||||
|
from thinc.api import Config, Model, set_dropout_rate
|
||||||
|
from thinc.api import Optimizer, get_current_ops, Ops
|
||||||
|
from thinc.types import Floats2d, Ragged, Ints1d
|
||||||
|
|
||||||
|
from spacy.language import Language
|
||||||
|
from spacy.pipeline.trainable_pipe import TrainablePipe
|
||||||
|
from spacy.tokens import Doc
|
||||||
|
from spacy.training import Example
|
||||||
|
from spacy.scorer import Scorer
|
||||||
|
from ..util import registry
|
||||||
|
from .spancat import Suggester
|
||||||
|
|
||||||
|
|
||||||
|
span_finder_default_config = """
|
||||||
|
[model]
|
||||||
|
@architectures = "spacy.SpanFinder.v1"
|
||||||
|
|
||||||
|
[model.scorer]
|
||||||
|
@layers = "spacy.LinearLogistic.v1"
|
||||||
|
nO = 2
|
||||||
|
|
||||||
|
[model.tok2vec]
|
||||||
|
@architectures = "spacy.Tok2Vec.v2"
|
||||||
|
|
||||||
|
[model.tok2vec.embed]
|
||||||
|
@architectures = "spacy.MultiHashEmbed.v2"
|
||||||
|
width = 96
|
||||||
|
rows = [5000, 2000, 1000, 1000]
|
||||||
|
attrs = ["ORTH", "PREFIX", "SUFFIX", "SHAPE"]
|
||||||
|
include_static_vectors = false
|
||||||
|
|
||||||
|
[model.tok2vec.encode]
|
||||||
|
@architectures = "spacy.MaxoutWindowEncoder.v2"
|
||||||
|
width = ${model.tok2vec.embed.width}
|
||||||
|
window_size = 1
|
||||||
|
maxout_pieces = 3
|
||||||
|
depth = 4
|
||||||
|
"""
|
||||||
|
|
||||||
|
DEFAULT_SPAN_FINDER_MODEL = Config().from_str(span_finder_default_config)["model"]
|
||||||
|
DEFAULT_PREDICTED_KEY = "span_candidates"
|
||||||
|
# XXX What was this TODO for?
|
||||||
|
DEFAULT_TRAINING_KEY = "sc" # TODO: define in spancat
|
||||||
|
|
||||||
|
|
||||||
|
@Language.factory(
|
||||||
|
"span_finder",
|
||||||
|
assigns=["doc.spans"],
|
||||||
|
default_config={
|
||||||
|
"threshold": 0.5,
|
||||||
|
"model": DEFAULT_SPAN_FINDER_MODEL,
|
||||||
|
"predicted_key": DEFAULT_PREDICTED_KEY,
|
||||||
|
"training_key": DEFAULT_TRAINING_KEY,
|
||||||
|
# XXX Doesn't 0 seem bad compared to None instead?
|
||||||
|
"max_length": 0,
|
||||||
|
"min_length": 0,
|
||||||
|
"scorer": {
|
||||||
|
"@scorers": "spacy.span_finder_scorer.v1",
|
||||||
|
"predicted_key": DEFAULT_PREDICTED_KEY,
|
||||||
|
"training_key": DEFAULT_TRAINING_KEY,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
default_score_weights={
|
||||||
|
f"span_finder_{DEFAULT_PREDICTED_KEY}_f": 1.0,
|
||||||
|
f"span_finder_{DEFAULT_PREDICTED_KEY}_p": 0.0,
|
||||||
|
f"span_finder_{DEFAULT_PREDICTED_KEY}_r": 0.0,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
def make_span_finder(
|
||||||
|
nlp: Language,
|
||||||
|
name: str,
|
||||||
|
model: Model[Iterable[Doc], Floats2d],
|
||||||
|
scorer: Optional[Callable],
|
||||||
|
threshold: float,
|
||||||
|
max_length: int,
|
||||||
|
min_length: int,
|
||||||
|
predicted_key: str = DEFAULT_PREDICTED_KEY,
|
||||||
|
training_key: str = DEFAULT_TRAINING_KEY,
|
||||||
|
) -> "SpanFinder":
|
||||||
|
"""Create a SpanFinder component. The component predicts whether a token is
|
||||||
|
the start or the end of a potential span.
|
||||||
|
|
||||||
|
model (Model[List[Doc], Floats2d]): A model instance that
|
||||||
|
is given a list of documents and predicts a probability for each token.
|
||||||
|
threshold (float): Minimum probability to consider a prediction positive.
|
||||||
|
predicted_key (str): Name of the span group the predicted spans are saved
|
||||||
|
to
|
||||||
|
training_key (str): Name of the span group the training spans are read
|
||||||
|
from
|
||||||
|
max_length (int): Max length of the produced spans (no max limitation when
|
||||||
|
set to 0)
|
||||||
|
min_length (int): Min length of the produced spans (no min limitation when
|
||||||
|
set to 0)
|
||||||
|
"""
|
||||||
|
return SpanFinder(
|
||||||
|
nlp,
|
||||||
|
model=model,
|
||||||
|
threshold=threshold,
|
||||||
|
name=name,
|
||||||
|
scorer=scorer,
|
||||||
|
max_length=max_length,
|
||||||
|
min_length=min_length,
|
||||||
|
predicted_key=predicted_key,
|
||||||
|
training_key=training_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@registry.scorers("spacy.span_finder_scorer.v1")
|
||||||
|
def make_span_finder_scorer(
|
||||||
|
predicted_key: str = DEFAULT_PREDICTED_KEY,
|
||||||
|
training_key: str = DEFAULT_TRAINING_KEY,
|
||||||
|
):
|
||||||
|
return partial(
|
||||||
|
span_finder_score, predicted_key=predicted_key, training_key=training_key
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def span_finder_score(
|
||||||
|
examples: Iterable[Example],
|
||||||
|
*,
|
||||||
|
predicted_key: str = DEFAULT_PREDICTED_KEY,
|
||||||
|
training_key: str = DEFAULT_TRAINING_KEY,
|
||||||
|
**kwargs,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
kwargs = dict(kwargs)
|
||||||
|
attr_prefix = "span_finder_"
|
||||||
|
kwargs.setdefault("attr", f"{attr_prefix}{predicted_key}")
|
||||||
|
kwargs.setdefault("allow_overlap", True)
|
||||||
|
kwargs.setdefault(
|
||||||
|
"getter", lambda doc, key: doc.spans.get(key[len(attr_prefix) :], [])
|
||||||
|
)
|
||||||
|
kwargs.setdefault("labeled", False)
|
||||||
|
kwargs.setdefault("has_annotation", lambda doc: predicted_key in doc.spans)
|
||||||
|
# score_spans can only score spans with the same key in both the reference
|
||||||
|
# and predicted docs, so temporarily copy the reference spans from the
|
||||||
|
# reference key to the candidates key in the reference docs, restoring the
|
||||||
|
# original span groups afterwards
|
||||||
|
orig_span_groups = []
|
||||||
|
for eg in examples:
|
||||||
|
orig_span_groups.append(eg.reference.spans.get(predicted_key))
|
||||||
|
if training_key in eg.reference.spans:
|
||||||
|
eg.reference.spans[predicted_key] = eg.reference.spans[training_key]
|
||||||
|
scores = Scorer.score_spans(examples, **kwargs)
|
||||||
|
for orig_span_group, eg in zip(orig_span_groups, examples):
|
||||||
|
if orig_span_group is not None:
|
||||||
|
eg.reference.spans[predicted_key] = orig_span_group
|
||||||
|
return scores
|
||||||
|
|
||||||
|
|
||||||
|
class SpanFinder(TrainablePipe):
|
||||||
|
"""Pipeline that learns span boundaries"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
nlp: Language,
|
||||||
|
model: Model[Iterable[Doc], Floats2d],
|
||||||
|
name: str = "span_finder",
|
||||||
|
*,
|
||||||
|
threshold: float = 0.5,
|
||||||
|
max_length: int = 0,
|
||||||
|
min_length: int = 0,
|
||||||
|
# XXX I think this is weird and should be just None like in
|
||||||
|
scorer: Optional[Callable] = partial(
|
||||||
|
span_finder_score,
|
||||||
|
predicted_key=DEFAULT_PREDICTED_KEY,
|
||||||
|
training_key=DEFAULT_TRAINING_KEY,
|
||||||
|
),
|
||||||
|
predicted_key: str = DEFAULT_PREDICTED_KEY,
|
||||||
|
training_key: str = DEFAULT_TRAINING_KEY,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the span boundary detector.
|
||||||
|
model (thinc.api.Model): The Thinc Model powering the pipeline component.
|
||||||
|
name (str): The component instance name, used to add entries to the
|
||||||
|
losses during training.
|
||||||
|
threshold (float): Minimum probability to consider a prediction
|
||||||
|
positive.
|
||||||
|
scorer (Optional[Callable]): The scoring method.
|
||||||
|
predicted_key (str): Name of the span group the candidate spans are saved to
|
||||||
|
training_key (str): Name of the span group the training spans are read from
|
||||||
|
max_length (int): Max length of the produced spans (unlimited when set to 0)
|
||||||
|
min_length (int): Min length of the produced spans (unlimited when set to 0)
|
||||||
|
"""
|
||||||
|
self.vocab = nlp.vocab
|
||||||
|
self.threshold = threshold
|
||||||
|
self.max_length = max_length
|
||||||
|
self.min_length = min_length
|
||||||
|
self.predicted_key = predicted_key
|
||||||
|
self.training_key = training_key
|
||||||
|
self.model = model
|
||||||
|
self.name = name
|
||||||
|
self.scorer = scorer
|
||||||
|
|
||||||
|
def predict(self, docs: Iterable[Doc]):
|
||||||
|
"""Apply the pipeline's model to a batch of docs, without modifying them.
|
||||||
|
docs (Iterable[Doc]): The documents to predict.
|
||||||
|
RETURNS: The models prediction for each document.
|
||||||
|
"""
|
||||||
|
scores = self.model.predict(docs)
|
||||||
|
return scores
|
||||||
|
|
||||||
|
def set_annotations(self, docs: Iterable[Doc], scores: Floats2d) -> None:
|
||||||
|
"""Modify a batch of Doc objects, using pre-computed scores.
|
||||||
|
docs (Iterable[Doc]): The documents to modify.
|
||||||
|
scores: The scores to set, produced by SpanFinder predict method.
|
||||||
|
"""
|
||||||
|
lengths = [len(doc) for doc in docs]
|
||||||
|
|
||||||
|
offset = 0
|
||||||
|
scores_per_doc = []
|
||||||
|
# XXX Isn't this really inefficient that we are creating these
|
||||||
|
# slices ahead of time? Couldn't we just do this in the next loop?
|
||||||
|
for length in lengths:
|
||||||
|
scores_per_doc.append(scores[offset : offset + length])
|
||||||
|
offset += length
|
||||||
|
|
||||||
|
for doc, doc_scores in zip(docs, scores_per_doc):
|
||||||
|
doc.spans[self.predicted_key] = []
|
||||||
|
starts = []
|
||||||
|
ends = []
|
||||||
|
|
||||||
|
for token, token_score in zip(doc, doc_scores):
|
||||||
|
if token_score[0] >= self.threshold:
|
||||||
|
starts.append(token.i)
|
||||||
|
if token_score[1] >= self.threshold:
|
||||||
|
ends.append(token.i)
|
||||||
|
|
||||||
|
for start in starts:
|
||||||
|
for end in ends:
|
||||||
|
span_length = end + 1 - start
|
||||||
|
# XXX I really feel like min_length and max_length should be
|
||||||
|
# None instead of 0 and then just set them to -1 and inf if they
|
||||||
|
# are given as None.
|
||||||
|
if span_length > 0:
|
||||||
|
if (
|
||||||
|
self.min_length <= 0 or span_length >= self.min_length
|
||||||
|
) and (self.max_length <= 0 or span_length <= self.max_length):
|
||||||
|
doc.spans[self.predicted_key].append(doc[start : end + 1])
|
||||||
|
elif self.max_length > 0 and span_length > self.max_length:
|
||||||
|
break
|
||||||
|
|
||||||
|
def update(
|
||||||
|
self,
|
||||||
|
examples: Iterable[Example],
|
||||||
|
*,
|
||||||
|
drop: float = 0.0,
|
||||||
|
sgd: Optional[Optimizer] = None,
|
||||||
|
losses: Optional[Dict[str, float]] = None,
|
||||||
|
) -> Dict[str, float]:
|
||||||
|
"""Learn from a batch of documents and gold-standard information,
|
||||||
|
updating the pipe's model. Delegates to predict and get_loss.
|
||||||
|
examples (Iterable[Example]): A batch of Example objects.
|
||||||
|
drop (float): The dropout rate.
|
||||||
|
sgd (Optional[thinc.api.Optimizer]): The optimizer.
|
||||||
|
losses (Optional[Dict[str, float]]): Optional record of the loss during training.
|
||||||
|
Updated using the component name as the key.
|
||||||
|
RETURNS (Dict[str, float]): The updated losses dictionary.
|
||||||
|
"""
|
||||||
|
if losses is None:
|
||||||
|
losses = {}
|
||||||
|
losses.setdefault(self.name, 0.0)
|
||||||
|
predicted = [eg.predicted for eg in examples]
|
||||||
|
set_dropout_rate(self.model, drop)
|
||||||
|
scores, backprop_scores = self.model.begin_update(predicted)
|
||||||
|
loss, d_scores = self.get_loss(examples, scores)
|
||||||
|
backprop_scores(d_scores)
|
||||||
|
if sgd is not None:
|
||||||
|
self.finish_update(sgd)
|
||||||
|
losses[self.name] += loss
|
||||||
|
return losses
|
||||||
|
|
||||||
|
def get_loss(self, examples, scores) -> Tuple[float, Floats2d]:
|
||||||
|
"""Find the loss and gradient of loss for the batch of documents and
|
||||||
|
their predicted scores.
|
||||||
|
examples (Iterable[Examples]): The batch of examples.
|
||||||
|
scores: Scores representing the model's predictions.
|
||||||
|
RETURNS (Tuple[float, float]): The loss and the gradient.
|
||||||
|
"""
|
||||||
|
reference_truths = self._get_aligned_truth_scores(examples)
|
||||||
|
d_scores = scores - self.model.ops.asarray2f(reference_truths)
|
||||||
|
loss = float((d_scores**2).sum())
|
||||||
|
return loss, d_scores
|
||||||
|
|
||||||
|
def _get_aligned_truth_scores(self, examples) -> List[Tuple[int, int]]:
|
||||||
|
"""Align scores of the predictions to the references for calculating the loss"""
|
||||||
|
# TODO: handle misaligned (None) alignments
|
||||||
|
# TODO: handle cases with differing whitespace in texts
|
||||||
|
reference_truths = []
|
||||||
|
|
||||||
|
for eg in examples:
|
||||||
|
start_indices = set()
|
||||||
|
end_indices = set()
|
||||||
|
|
||||||
|
if self.training_key in eg.reference.spans:
|
||||||
|
for span in eg.reference.spans[self.training_key]:
|
||||||
|
start_indices.add(eg.reference[span.start].idx)
|
||||||
|
end_indices.add(
|
||||||
|
eg.reference[span.end - 1].idx + len(eg.reference[span.end - 1])
|
||||||
|
)
|
||||||
|
|
||||||
|
for token in eg.predicted:
|
||||||
|
reference_truths.append(
|
||||||
|
(
|
||||||
|
1 if token.idx in start_indices else 0,
|
||||||
|
1 if token.idx + len(token) in end_indices else 0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return reference_truths
|
||||||
|
|
||||||
|
def _get_reference(self, docs) -> List[Tuple[int, int]]:
|
||||||
|
"""Create a reference list of token probabilities"""
|
||||||
|
reference_probabilities = []
|
||||||
|
for doc in docs:
|
||||||
|
start_indices = set()
|
||||||
|
end_indices = set()
|
||||||
|
|
||||||
|
if self.training_key in doc.spans:
|
||||||
|
for span in doc.spans[self.training_key]:
|
||||||
|
start_indices.add(span.start)
|
||||||
|
end_indices.add(span.end - 1)
|
||||||
|
|
||||||
|
for token in doc:
|
||||||
|
reference_probabilities.append(
|
||||||
|
(
|
||||||
|
1 if token.i in start_indices else 0,
|
||||||
|
1 if token.i in end_indices else 0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return reference_probabilities
|
||||||
|
|
||||||
|
def initialize(
|
||||||
|
self,
|
||||||
|
get_examples: Callable[[], Iterable[Example]],
|
||||||
|
*,
|
||||||
|
nlp: Optional[Language] = None,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the pipe for training, using a representative set
|
||||||
|
of data examples.
|
||||||
|
get_examples (Callable[[], Iterable[Example]]): Function that
|
||||||
|
returns a representative sample of gold-standard Example objects.
|
||||||
|
nlp (Optional[Language]): The current nlp object the component is part of.
|
||||||
|
"""
|
||||||
|
subbatch: List[Example] = []
|
||||||
|
|
||||||
|
for eg in get_examples():
|
||||||
|
if len(subbatch) < 10:
|
||||||
|
subbatch.append(eg)
|
||||||
|
|
||||||
|
if subbatch:
|
||||||
|
docs = [eg.reference for eg in subbatch]
|
||||||
|
Y = self.model.ops.asarray2f(self._get_reference(docs))
|
||||||
|
self.model.initialize(X=docs, Y=Y)
|
||||||
|
else:
|
||||||
|
self.model.initialize()
|
||||||
|
|
||||||
|
@registry.misc("spacy.span_finder_suggester.v1")
|
||||||
|
def build_span_finder_suggester(candidates_key: str) -> Suggester:
|
||||||
|
"""Suggest every candidate predicted by the SpanFinder"""
|
||||||
|
|
||||||
|
def span_finder_suggester(
|
||||||
|
docs: Iterable[Doc], *, ops: Optional[Ops] = None
|
||||||
|
) -> Ragged:
|
||||||
|
if ops is None:
|
||||||
|
ops = get_current_ops()
|
||||||
|
spans = []
|
||||||
|
lengths = []
|
||||||
|
for doc in docs:
|
||||||
|
length = 0
|
||||||
|
if doc.spans[candidates_key]:
|
||||||
|
for span in doc.spans[candidates_key]:
|
||||||
|
spans.append([span.start, span.end])
|
||||||
|
length += 1
|
||||||
|
|
||||||
|
lengths.append(length)
|
||||||
|
|
||||||
|
lengths_array = cast(Ints1d, ops.asarray(lengths, dtype="i"))
|
||||||
|
if len(spans) > 0:
|
||||||
|
output = Ragged(ops.asarray(spans, dtype="i"), lengths_array)
|
||||||
|
else:
|
||||||
|
output = Ragged(ops.xp.zeros((0, 0), dtype="i"), lengths_array)
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
return span_finder_suggester
|
180
spacy/tests/pipeline/test_span_finder.py
Normal file
180
spacy/tests/pipeline/test_span_finder.py
Normal file
|
@ -0,0 +1,180 @@
|
||||||
|
from spacy.language import Language
|
||||||
|
from spacy.util import registry
|
||||||
|
from spacy.tokens import Doc
|
||||||
|
from spacy.training import Example
|
||||||
|
from thinc.api import Config
|
||||||
|
from thinc.types import Ragged
|
||||||
|
from spacy.pipeline.span_finder import DEFAULT_PREDICTED_KEY
|
||||||
|
from spacy.pipeline.span_finder import span_finder_default_config
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
TRAINING_KEY = "pytest"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"tokens_predicted, tokens_reference, reference_truths",
|
||||||
|
[
|
||||||
|
(
|
||||||
|
["Mon", ".", "-", "June", "16"],
|
||||||
|
["Mon.", "-", "June", "16"],
|
||||||
|
[(0, 0), (0, 0), (0, 0), (1, 1), (0, 0)],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
["Mon.", "-", "J", "une", "16"],
|
||||||
|
["Mon.", "-", "June", "16"],
|
||||||
|
[(0, 0), (0, 0), (1, 0), (0, 1), (0, 0)],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
["Mon", ".", "-", "June", "16"],
|
||||||
|
["Mon.", "-", "June", "1", "6"],
|
||||||
|
[(0, 0), (0, 0), (0, 0), (1, 1), (0, 0)],
|
||||||
|
),
|
||||||
|
(
|
||||||
|
["Mon.", "-J", "un", "e 16"],
|
||||||
|
["Mon.", "-", "June", "16"],
|
||||||
|
[(0, 0), (0, 0), (0, 0), (0, 0)],
|
||||||
|
),
|
||||||
|
pytest.param(
|
||||||
|
["Mon.-June", "16"],
|
||||||
|
["Mon.", "-", "June", "16"],
|
||||||
|
[(0, 1), (0, 0)],
|
||||||
|
),
|
||||||
|
pytest.param(
|
||||||
|
["Mon.-", "June", "16"],
|
||||||
|
["Mon.", "-", "J", "une", "16"],
|
||||||
|
[(0, 0), (1, 1), (0, 0)],
|
||||||
|
),
|
||||||
|
pytest.param(
|
||||||
|
["Mon.-", "June 16"],
|
||||||
|
["Mon.", "-", "June", "16"],
|
||||||
|
[(0, 0), (1, 0)],
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_loss_alignment_example(tokens_predicted, tokens_reference, reference_truths):
|
||||||
|
nlp = Language()
|
||||||
|
predicted = Doc(
|
||||||
|
nlp.vocab, words=tokens_predicted, spaces=[False] * len(tokens_predicted)
|
||||||
|
)
|
||||||
|
reference = Doc(
|
||||||
|
nlp.vocab, words=tokens_reference, spaces=[False] * len(tokens_reference)
|
||||||
|
)
|
||||||
|
example = Example(predicted, reference)
|
||||||
|
example.reference.spans[TRAINING_KEY] = [example.reference.char_span(5, 9)]
|
||||||
|
span_finder = nlp.add_pipe(
|
||||||
|
"span_finder", config={"training_key": TRAINING_KEY}
|
||||||
|
)
|
||||||
|
nlp.initialize()
|
||||||
|
|
||||||
|
truth_scores = span_finder._get_aligned_truth_scores([example])
|
||||||
|
assert len(truth_scores) == len(tokens_predicted)
|
||||||
|
assert truth_scores == reference_truths
|
||||||
|
|
||||||
|
|
||||||
|
def test_span_finder_model():
|
||||||
|
nlp = Language()
|
||||||
|
|
||||||
|
docs = [nlp("This is an example."), nlp("This is the second example.")]
|
||||||
|
docs[0].spans[TRAINING_KEY] = [docs[0][3:4]]
|
||||||
|
docs[1].spans[TRAINING_KEY] = [docs[1][3:5]]
|
||||||
|
|
||||||
|
total_tokens = 0
|
||||||
|
for doc in docs:
|
||||||
|
total_tokens += len(doc)
|
||||||
|
|
||||||
|
config = Config().from_str(span_finder_default_config).interpolate()
|
||||||
|
model = registry.resolve(config)["model"]
|
||||||
|
|
||||||
|
model.initialize(X=docs)
|
||||||
|
predictions = model.predict(docs)
|
||||||
|
|
||||||
|
assert len(predictions) == total_tokens
|
||||||
|
assert len(predictions[0]) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_span_finder_component():
|
||||||
|
nlp = Language()
|
||||||
|
|
||||||
|
docs = [nlp("This is an example."), nlp("This is the second example.")]
|
||||||
|
docs[0].spans[TRAINING_KEY] = [docs[0][3:4]]
|
||||||
|
docs[1].spans[TRAINING_KEY] = [docs[1][3:5]]
|
||||||
|
|
||||||
|
span_finder = nlp.add_pipe(
|
||||||
|
"span_finder", config={"training_key": TRAINING_KEY}
|
||||||
|
)
|
||||||
|
nlp.initialize()
|
||||||
|
docs = list(span_finder.pipe(docs))
|
||||||
|
|
||||||
|
# TODO: update hard-coded name
|
||||||
|
assert "span_candidates" in docs[0].spans
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"min_length, max_length, span_count", [(0, 0, 8), (2, 0, 6), (0, 1, 2), (2, 3, 2)]
|
||||||
|
)
|
||||||
|
def test_set_annotations_span_lengths(min_length, max_length, span_count):
|
||||||
|
nlp = Language()
|
||||||
|
doc = nlp("Me and Jenny goes together like peas and carrots.")
|
||||||
|
span_finder = nlp.add_pipe(
|
||||||
|
"span_finder",
|
||||||
|
config={
|
||||||
|
"max_length": max_length,
|
||||||
|
"min_length": min_length,
|
||||||
|
"training_key": TRAINING_KEY,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
nlp.initialize()
|
||||||
|
# Starts [Me, Jenny, peas]
|
||||||
|
# Ends [Jenny, peas, carrots]
|
||||||
|
scores = [
|
||||||
|
(1, 0),
|
||||||
|
(0, 0),
|
||||||
|
(1, 1),
|
||||||
|
(0, 0),
|
||||||
|
(0, 0),
|
||||||
|
(0, 0),
|
||||||
|
(1, 1),
|
||||||
|
(0, 0),
|
||||||
|
(0, 1),
|
||||||
|
(0, 0),
|
||||||
|
]
|
||||||
|
span_finder.set_annotations([doc], scores)
|
||||||
|
|
||||||
|
assert doc.spans[DEFAULT_PREDICTED_KEY]
|
||||||
|
assert len(doc.spans[DEFAULT_PREDICTED_KEY]) == span_count
|
||||||
|
|
||||||
|
# Assert below will fail when max_length is set to 0
|
||||||
|
if max_length <= 0:
|
||||||
|
max_length = len(doc)
|
||||||
|
|
||||||
|
assert all(
|
||||||
|
min_length <= len(span) <= max_length
|
||||||
|
for span in doc.spans[DEFAULT_PREDICTED_KEY]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_span_finder_suggester():
|
||||||
|
nlp = Language()
|
||||||
|
docs = [nlp("This is an example."), nlp("This is the second example.")]
|
||||||
|
docs[0].spans[TRAINING_KEY] = [docs[0][3:4]]
|
||||||
|
docs[1].spans[TRAINING_KEY] = [docs[1][3:5]]
|
||||||
|
span_finder = nlp.add_pipe(
|
||||||
|
"span_finder", config={"training_key": TRAINING_KEY}
|
||||||
|
)
|
||||||
|
nlp.initialize()
|
||||||
|
span_finder.set_annotations(docs, span_finder.predict(docs))
|
||||||
|
|
||||||
|
suggester = registry.misc.get("spacy.span_finder_suggester.v1")(
|
||||||
|
candidates_key="span_candidates"
|
||||||
|
)
|
||||||
|
|
||||||
|
candidates = suggester(docs)
|
||||||
|
|
||||||
|
span_length = 0
|
||||||
|
for doc in docs:
|
||||||
|
span_length += len(doc.spans["span_candidates"])
|
||||||
|
|
||||||
|
assert span_length == len(candidates.dataXd)
|
||||||
|
assert type(candidates) == Ragged
|
||||||
|
assert len(candidates.dataXd[0]) == 2
|
Loading…
Reference in New Issue
Block a user