mirror of
https://github.com/explosion/spaCy.git
synced 2025-08-02 11:20:19 +03:00
In order to support Python 3.13, we had to migrate to Cython 3.0. This caused some tricky interaction with our Pydantic usage, because Cython 3 uses the from __future__ import annotations semantics, which causes type annotations to be saved as strings. The end result is that we can't have Language.factory decorated functions in Cython modules anymore, as the Language.factory decorator expects to inspect the signature of the functions and build a Pydantic model. If the function is implemented in Cython, an error is raised because the type is not resolved. To address this I've moved the factory functions into a new module, spacy.pipeline.factories. I've added __getattr__ importlib hooks to the previous locations, in case anyone was importing these functions directly. The change should have no backwards compatibility implications. Along the way I've also refactored the registration of functions for the config. Previously these ran as import-time side-effects, using the registry decorator. I've created instead a new module spacy.registrations. When the registry is accessed it calls a function ensure_populated(), which cases the registrations to occur. I've made a similar change to the Language.factory registrations in the new spacy.pipeline.factories module. I want to remove these import-time side-effects so that we can speed up the loading time of the library, which can be especially painful on the CLI. I also find that I'm often working to track down the implementations of functions referenced by strings in the config. Having the registrations all happen in one place will make this easier. With these changes I've fortunately avoided the need to migrate to Pydantic v2 properly --- we're still using the v1 compatibility shim. We might not be able to hold out forever though: Pydantic (reasonably) aren't actively supporting the v1 shims. I put a lot of work into v2 migration when investigating the 3.13 support, and it's definitely challenging. In any case, it's a relief that we don't have to do the v2 migration at the same time as the Cython 3.0/Python 3.13 support.
667 lines
24 KiB
Python
667 lines
24 KiB
Python
import importlib
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from functools import partial
|
|
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast
|
|
|
|
import numpy
|
|
from thinc.api import Config, Model, Ops, Optimizer, get_current_ops, set_dropout_rate
|
|
from thinc.types import Floats2d, Ints1d, Ints2d, Ragged
|
|
|
|
from ..compat import Protocol, runtime_checkable
|
|
from ..errors import Errors
|
|
from ..language import Language
|
|
from ..scorer import Scorer
|
|
from ..tokens import Doc, Span, SpanGroup
|
|
from ..training import Example, validate_examples
|
|
from ..util import registry
|
|
from ..vocab import Vocab
|
|
from .trainable_pipe import TrainablePipe
|
|
|
|
spancat_default_config = """
|
|
[model]
|
|
@architectures = "spacy.SpanCategorizer.v1"
|
|
scorer = {"@layers": "spacy.LinearLogistic.v1"}
|
|
|
|
[model.reducer]
|
|
@layers = spacy.mean_max_reducer.v1
|
|
hidden_size = 128
|
|
|
|
[model.tok2vec]
|
|
@architectures = "spacy.Tok2Vec.v2"
|
|
|
|
[model.tok2vec.embed]
|
|
@architectures = "spacy.MultiHashEmbed.v2"
|
|
width = 96
|
|
rows = [5000, 1000, 2500, 1000]
|
|
attrs = ["NORM", "PREFIX", "SUFFIX", "SHAPE"]
|
|
include_static_vectors = false
|
|
|
|
[model.tok2vec.encode]
|
|
@architectures = "spacy.MaxoutWindowEncoder.v2"
|
|
width = ${model.tok2vec.embed.width}
|
|
window_size = 1
|
|
maxout_pieces = 3
|
|
depth = 4
|
|
"""
|
|
|
|
spancat_singlelabel_default_config = """
|
|
[model]
|
|
@architectures = "spacy.SpanCategorizer.v1"
|
|
scorer = {"@layers": "Softmax.v2"}
|
|
|
|
[model.reducer]
|
|
@layers = spacy.mean_max_reducer.v1
|
|
hidden_size = 128
|
|
|
|
[model.tok2vec]
|
|
@architectures = "spacy.Tok2Vec.v2"
|
|
[model.tok2vec.embed]
|
|
@architectures = "spacy.MultiHashEmbed.v1"
|
|
width = 96
|
|
rows = [5000, 1000, 2500, 1000]
|
|
attrs = ["NORM", "PREFIX", "SUFFIX", "SHAPE"]
|
|
include_static_vectors = false
|
|
|
|
[model.tok2vec.encode]
|
|
@architectures = "spacy.MaxoutWindowEncoder.v2"
|
|
width = ${model.tok2vec.embed.width}
|
|
window_size = 1
|
|
maxout_pieces = 3
|
|
depth = 4
|
|
"""
|
|
|
|
DEFAULT_SPANS_KEY = "sc"
|
|
DEFAULT_SPANCAT_MODEL = Config().from_str(spancat_default_config)["model"]
|
|
DEFAULT_SPANCAT_SINGLELABEL_MODEL = Config().from_str(
|
|
spancat_singlelabel_default_config
|
|
)["model"]
|
|
|
|
|
|
@runtime_checkable
|
|
class Suggester(Protocol):
|
|
def __call__(self, docs: Iterable[Doc], *, ops: Optional[Ops] = None) -> Ragged:
|
|
...
|
|
|
|
|
|
def ngram_suggester(
|
|
docs: Iterable[Doc], sizes: List[int], *, ops: Optional[Ops] = None
|
|
) -> Ragged:
|
|
if ops is None:
|
|
ops = get_current_ops()
|
|
spans = []
|
|
lengths = []
|
|
for doc in docs:
|
|
starts = ops.xp.arange(len(doc), dtype="i")
|
|
starts = starts.reshape((-1, 1))
|
|
length = 0
|
|
for size in sizes:
|
|
if size <= len(doc):
|
|
starts_size = starts[: len(doc) - (size - 1)]
|
|
spans.append(ops.xp.hstack((starts_size, starts_size + size)))
|
|
length += spans[-1].shape[0]
|
|
if spans:
|
|
assert spans[-1].ndim == 2, spans[-1].shape
|
|
lengths.append(length)
|
|
lengths_array = ops.asarray1i(lengths)
|
|
if len(spans) > 0:
|
|
output = Ragged(ops.xp.vstack(spans), lengths_array)
|
|
else:
|
|
output = Ragged(ops.xp.zeros((0, 0), dtype="i"), lengths_array)
|
|
|
|
assert output.dataXd.ndim == 2
|
|
return output
|
|
|
|
|
|
def preset_spans_suggester(
|
|
docs: Iterable[Doc], spans_key: str, *, ops: Optional[Ops] = None
|
|
) -> Ragged:
|
|
if ops is None:
|
|
ops = get_current_ops()
|
|
spans = []
|
|
lengths = []
|
|
for doc in docs:
|
|
length = 0
|
|
if doc.spans[spans_key]:
|
|
for span in doc.spans[spans_key]:
|
|
spans.append([span.start, span.end])
|
|
length += 1
|
|
|
|
lengths.append(length)
|
|
lengths_array = cast(Ints1d, ops.asarray(lengths, dtype="i"))
|
|
if len(spans) > 0:
|
|
output = Ragged(ops.asarray(spans, dtype="i"), lengths_array)
|
|
else:
|
|
output = Ragged(ops.xp.zeros((0, 0), dtype="i"), lengths_array)
|
|
return output
|
|
|
|
|
|
def build_ngram_suggester(sizes: List[int]) -> Suggester:
|
|
"""Suggest all spans of the given lengths. Spans are returned as a ragged
|
|
array of integers. The array has two columns, indicating the start and end
|
|
position."""
|
|
|
|
return partial(ngram_suggester, sizes=sizes)
|
|
|
|
|
|
def build_ngram_range_suggester(min_size: int, max_size: int) -> Suggester:
|
|
"""Suggest all spans of the given lengths between a given min and max value - both inclusive.
|
|
Spans are returned as a ragged array of integers. The array has two columns,
|
|
indicating the start and end position."""
|
|
sizes = list(range(min_size, max_size + 1))
|
|
return build_ngram_suggester(sizes)
|
|
|
|
|
|
def build_preset_spans_suggester(spans_key: str) -> Suggester:
|
|
"""Suggest all spans that are already stored in doc.spans[spans_key].
|
|
This is useful when an upstream component is used to set the spans
|
|
on the Doc such as a SpanRuler or SpanFinder."""
|
|
return partial(preset_spans_suggester, spans_key=spans_key)
|
|
|
|
|
|
def spancat_score(examples: Iterable[Example], **kwargs) -> Dict[str, Any]:
|
|
kwargs = dict(kwargs)
|
|
attr_prefix = "spans_"
|
|
key = kwargs["spans_key"]
|
|
kwargs.setdefault("attr", f"{attr_prefix}{key}")
|
|
kwargs.setdefault("allow_overlap", True)
|
|
kwargs.setdefault(
|
|
"getter", lambda doc, key: doc.spans.get(key[len(attr_prefix) :], [])
|
|
)
|
|
kwargs.setdefault("has_annotation", lambda doc: key in doc.spans)
|
|
return Scorer.score_spans(examples, **kwargs)
|
|
|
|
|
|
def make_spancat_scorer():
|
|
return spancat_score
|
|
|
|
|
|
@dataclass
|
|
class _Intervals:
|
|
"""
|
|
Helper class to avoid storing overlapping spans.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.ranges = set()
|
|
|
|
def add(self, i, j):
|
|
for e in range(i, j):
|
|
self.ranges.add(e)
|
|
|
|
def __contains__(self, rang):
|
|
i, j = rang
|
|
for e in range(i, j):
|
|
if e in self.ranges:
|
|
return True
|
|
return False
|
|
|
|
|
|
class SpanCategorizer(TrainablePipe):
|
|
"""Pipeline component to label spans of text.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
vocab: Vocab,
|
|
model: Model[Tuple[List[Doc], Ragged], Floats2d],
|
|
suggester: Suggester,
|
|
name: str = "spancat",
|
|
*,
|
|
add_negative_label: bool = False,
|
|
spans_key: str = "spans",
|
|
negative_weight: Optional[float] = 1.0,
|
|
allow_overlap: Optional[bool] = True,
|
|
max_positive: Optional[int] = None,
|
|
threshold: Optional[float] = 0.5,
|
|
scorer: Optional[Callable] = spancat_score,
|
|
) -> None:
|
|
"""Initialize the multi-label or multi-class span categorizer.
|
|
|
|
vocab (Vocab): The shared vocabulary.
|
|
model (thinc.api.Model): The Thinc Model powering the pipeline component.
|
|
For multi-class classification (single label per span) we recommend
|
|
using a Softmax classifier as a the final layer, while for multi-label
|
|
classification (multiple possible labels per span) we recommend Logistic.
|
|
suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
|
|
Spans are returned as a ragged array with two integer columns, for the
|
|
start and end positions.
|
|
name (str): The component instance name, used to add entries to the
|
|
losses during training.
|
|
spans_key (str): Key of the Doc.spans dict to save the spans under.
|
|
During initialization and training, the component will look for
|
|
spans on the reference document under the same key. Defaults to
|
|
`"spans"`.
|
|
add_negative_label (bool): Learn to predict a special 'negative_label'
|
|
when a Span is not annotated.
|
|
threshold (Optional[float]): Minimum probability to consider a prediction
|
|
positive. Defaults to 0.5. Spans with a positive prediction will be saved
|
|
on the Doc.
|
|
max_positive (Optional[int]): Maximum number of labels to consider
|
|
positive per span. Defaults to None, indicating no limit.
|
|
negative_weight (float): Multiplier for the loss terms.
|
|
Can be used to downweight the negative samples if there are too many
|
|
when add_negative_label is True. Otherwise its unused.
|
|
allow_overlap (bool): If True the data is assumed to contain overlapping spans.
|
|
Otherwise it produces non-overlapping spans greedily prioritizing
|
|
higher assigned label scores. Only used when max_positive is 1.
|
|
scorer (Optional[Callable]): The scoring method. Defaults to
|
|
Scorer.score_spans for the Doc.spans[spans_key] with overlapping
|
|
spans allowed.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#init
|
|
"""
|
|
self.cfg = {
|
|
"labels": [],
|
|
"spans_key": spans_key,
|
|
"threshold": threshold,
|
|
"max_positive": max_positive,
|
|
"negative_weight": negative_weight,
|
|
"allow_overlap": allow_overlap,
|
|
}
|
|
self.vocab = vocab
|
|
self.suggester = suggester
|
|
self.model = model
|
|
self.name = name
|
|
self.scorer = scorer
|
|
self.add_negative_label = add_negative_label
|
|
if not allow_overlap and max_positive is not None and max_positive > 1:
|
|
raise ValueError(Errors.E1051.format(max_positive=max_positive))
|
|
|
|
@property
|
|
def key(self) -> str:
|
|
"""Key of the doc.spans dict to save the spans under. During
|
|
initialization and training, the component will look for spans on the
|
|
reference document under the same key.
|
|
"""
|
|
return str(self.cfg["spans_key"])
|
|
|
|
def _allow_extra_label(self) -> None:
|
|
"""Raise an error if the component can not add any more labels."""
|
|
nO = None
|
|
if self.model.has_dim("nO"):
|
|
nO = self.model.get_dim("nO")
|
|
elif self.model.has_ref("output_layer") and self.model.get_ref(
|
|
"output_layer"
|
|
).has_dim("nO"):
|
|
nO = self.model.get_ref("output_layer").get_dim("nO")
|
|
if nO is not None and nO == self._n_labels:
|
|
if not self.is_resizable:
|
|
raise ValueError(
|
|
Errors.E922.format(name=self.name, nO=self.model.get_dim("nO"))
|
|
)
|
|
|
|
def add_label(self, label: str) -> int:
|
|
"""Add a new label to the pipe.
|
|
|
|
label (str): The label to add.
|
|
RETURNS (int): 0 if label is already present, otherwise 1.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#add_label
|
|
"""
|
|
if not isinstance(label, str):
|
|
raise ValueError(Errors.E187)
|
|
if label in self.labels:
|
|
return 0
|
|
self._allow_extra_label()
|
|
self.cfg["labels"].append(label) # type: ignore
|
|
self.vocab.strings.add(label)
|
|
return 1
|
|
|
|
@property
|
|
def labels(self) -> Tuple[str]:
|
|
"""RETURNS (Tuple[str]): The labels currently added to the component.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#labels
|
|
"""
|
|
return tuple(self.cfg["labels"]) # type: ignore
|
|
|
|
@property
|
|
def label_data(self) -> List[str]:
|
|
"""RETURNS (List[str]): Information about the component's labels.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#label_data
|
|
"""
|
|
return list(self.labels)
|
|
|
|
@property
|
|
def _label_map(self) -> Dict[str, int]:
|
|
"""RETURNS (Dict[str, int]): The label map."""
|
|
return {label: i for i, label in enumerate(self.labels)}
|
|
|
|
@property
|
|
def _n_labels(self) -> int:
|
|
"""RETURNS (int): Number of labels."""
|
|
if self.add_negative_label:
|
|
return len(self.labels) + 1
|
|
else:
|
|
return len(self.labels)
|
|
|
|
@property
|
|
def _negative_label_i(self) -> Union[int, None]:
|
|
"""RETURNS (Union[int, None]): Index of the negative label."""
|
|
if self.add_negative_label:
|
|
return len(self.label_data)
|
|
else:
|
|
return None
|
|
|
|
def predict(self, docs: Iterable[Doc]):
|
|
"""Apply the pipeline's model to a batch of docs, without modifying them.
|
|
|
|
docs (Iterable[Doc]): The documents to predict.
|
|
RETURNS: The models prediction for each document.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#predict
|
|
"""
|
|
indices = self.suggester(docs, ops=self.model.ops)
|
|
if indices.lengths.sum() == 0:
|
|
scores = self.model.ops.alloc2f(0, 0)
|
|
else:
|
|
scores = self.model.predict((docs, indices)) # type: ignore
|
|
return indices, scores
|
|
|
|
def set_candidates(
|
|
self, docs: Iterable[Doc], *, candidates_key: str = "candidates"
|
|
) -> None:
|
|
"""Use the spancat suggester to add a list of span candidates to a list of docs.
|
|
This method is intended to be used for debugging purposes.
|
|
|
|
docs (Iterable[Doc]): The documents to modify.
|
|
candidates_key (str): Key of the Doc.spans dict to save the candidate spans under.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#set_candidates
|
|
"""
|
|
suggester_output = self.suggester(docs, ops=self.model.ops)
|
|
|
|
for candidates, doc in zip(suggester_output, docs): # type: ignore
|
|
doc.spans[candidates_key] = []
|
|
for index in candidates.dataXd:
|
|
doc.spans[candidates_key].append(doc[index[0] : index[1]])
|
|
|
|
def set_annotations(self, docs: Iterable[Doc], indices_scores) -> None:
|
|
"""Modify a batch of Doc objects, using pre-computed scores.
|
|
|
|
docs (Iterable[Doc]): The documents to modify.
|
|
scores: The scores to set, produced by SpanCategorizer.predict.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#set_annotations
|
|
"""
|
|
indices, scores = indices_scores
|
|
offset = 0
|
|
for i, doc in enumerate(docs):
|
|
indices_i = indices[i].dataXd
|
|
allow_overlap = cast(bool, self.cfg["allow_overlap"])
|
|
if self.cfg["max_positive"] == 1:
|
|
doc.spans[self.key] = self._make_span_group_singlelabel(
|
|
doc,
|
|
indices_i,
|
|
scores[offset : offset + indices.lengths[i]],
|
|
allow_overlap,
|
|
)
|
|
else:
|
|
doc.spans[self.key] = self._make_span_group_multilabel(
|
|
doc,
|
|
indices_i,
|
|
scores[offset : offset + indices.lengths[i]],
|
|
)
|
|
offset += indices.lengths[i]
|
|
|
|
def update(
|
|
self,
|
|
examples: Iterable[Example],
|
|
*,
|
|
drop: float = 0.0,
|
|
sgd: Optional[Optimizer] = None,
|
|
losses: Optional[Dict[str, float]] = None,
|
|
) -> Dict[str, float]:
|
|
"""Learn from a batch of documents and gold-standard information,
|
|
updating the pipe's model. Delegates to predict and get_loss.
|
|
|
|
examples (Iterable[Example]): A batch of Example objects.
|
|
drop (float): The dropout rate.
|
|
sgd (thinc.api.Optimizer): The optimizer.
|
|
losses (Dict[str, float]): Optional record of the loss during training.
|
|
Updated using the component name as the key.
|
|
RETURNS (Dict[str, float]): The updated losses dictionary.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#update
|
|
"""
|
|
if losses is None:
|
|
losses = {}
|
|
losses.setdefault(self.name, 0.0)
|
|
validate_examples(examples, "SpanCategorizer.update")
|
|
self._validate_categories(examples)
|
|
if not any(len(eg.predicted) if eg.predicted else 0 for eg in examples):
|
|
# Handle cases where there are no tokens in any docs.
|
|
return losses
|
|
docs = [eg.predicted for eg in examples]
|
|
spans = self.suggester(docs, ops=self.model.ops)
|
|
if spans.lengths.sum() == 0:
|
|
return losses
|
|
set_dropout_rate(self.model, drop)
|
|
scores, backprop_scores = self.model.begin_update((docs, spans))
|
|
loss, d_scores = self.get_loss(examples, (spans, scores))
|
|
backprop_scores(d_scores) # type: ignore
|
|
if sgd is not None:
|
|
self.finish_update(sgd)
|
|
losses[self.name] += loss
|
|
return losses
|
|
|
|
def get_loss(
|
|
self, examples: Iterable[Example], spans_scores: Tuple[Ragged, Floats2d]
|
|
) -> Tuple[float, float]:
|
|
"""Find the loss and gradient of loss for the batch of documents and
|
|
their predicted scores.
|
|
|
|
examples (Iterable[Examples]): The batch of examples.
|
|
spans_scores: Scores representing the model's predictions.
|
|
RETURNS (Tuple[float, float]): The loss and the gradient.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#get_loss
|
|
"""
|
|
spans, scores = spans_scores
|
|
spans = Ragged(
|
|
self.model.ops.to_numpy(spans.data), self.model.ops.to_numpy(spans.lengths)
|
|
)
|
|
target = numpy.zeros(scores.shape, dtype=scores.dtype)
|
|
if self.add_negative_label:
|
|
negative_spans = numpy.ones((scores.shape[0]))
|
|
offset = 0
|
|
label_map = self._label_map
|
|
for i, eg in enumerate(examples):
|
|
# Map (start, end) offset of spans to the row in the d_scores array,
|
|
# so that we can adjust the gradient for predictions that were
|
|
# in the gold standard.
|
|
spans_index = {}
|
|
spans_i = spans[i].dataXd
|
|
for j in range(spans.lengths[i]):
|
|
start = int(spans_i[j, 0]) # type: ignore
|
|
end = int(spans_i[j, 1]) # type: ignore
|
|
spans_index[(start, end)] = offset + j
|
|
for gold_span in self._get_aligned_spans(eg):
|
|
key = (gold_span.start, gold_span.end)
|
|
if key in spans_index:
|
|
row = spans_index[key]
|
|
k = label_map[gold_span.label_]
|
|
target[row, k] = 1.0
|
|
if self.add_negative_label:
|
|
# delete negative label target.
|
|
negative_spans[row] = 0.0
|
|
# The target is a flat array for all docs. Track the position
|
|
# we're at within the flat array.
|
|
offset += spans.lengths[i]
|
|
target = self.model.ops.asarray(target, dtype="f") # type: ignore
|
|
if self.add_negative_label:
|
|
negative_samples = numpy.nonzero(negative_spans)[0]
|
|
target[negative_samples, self._negative_label_i] = 1.0 # type: ignore
|
|
# The target will have the values 0 (for untrue predictions) or 1
|
|
# (for true predictions).
|
|
# The scores should be in the range [0, 1].
|
|
# If the prediction is 0.9 and it's true, the gradient
|
|
# will be -0.1 (0.9 - 1.0).
|
|
# If the prediction is 0.9 and it's false, the gradient will be
|
|
# 0.9 (0.9 - 0.0)
|
|
d_scores = scores - target
|
|
if self.add_negative_label:
|
|
neg_weight = cast(float, self.cfg["negative_weight"])
|
|
if neg_weight != 1.0:
|
|
d_scores[negative_samples] *= neg_weight
|
|
loss = float((d_scores**2).sum())
|
|
return loss, d_scores
|
|
|
|
def initialize(
|
|
self,
|
|
get_examples: Callable[[], Iterable[Example]],
|
|
*,
|
|
nlp: Optional[Language] = None,
|
|
labels: Optional[List[str]] = None,
|
|
) -> None:
|
|
"""Initialize the pipe for training, using a representative set
|
|
of data examples.
|
|
|
|
get_examples (Callable[[], Iterable[Example]]): Function that
|
|
returns a representative sample of gold-standard Example objects.
|
|
nlp (Optional[Language]): The current nlp object the component is part of.
|
|
labels (Optional[List[str]]): The labels to add to the component, typically generated by the
|
|
`init labels` command. If no labels are provided, the get_examples
|
|
callback is used to extract the labels from the data.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#initialize
|
|
"""
|
|
subbatch: List[Example] = []
|
|
if labels is not None:
|
|
for label in labels:
|
|
self.add_label(label)
|
|
for eg in get_examples():
|
|
if labels is None:
|
|
for span in eg.reference.spans.get(self.key, []):
|
|
self.add_label(span.label_)
|
|
if len(subbatch) < 10:
|
|
subbatch.append(eg)
|
|
self._require_labels()
|
|
if subbatch:
|
|
docs = [eg.x for eg in subbatch]
|
|
spans = build_ngram_suggester(sizes=[1])(docs)
|
|
Y = self.model.ops.alloc2f(spans.dataXd.shape[0], self._n_labels)
|
|
self.model.initialize(X=(docs, spans), Y=Y)
|
|
else:
|
|
self.model.initialize()
|
|
|
|
def _validate_categories(self, examples: Iterable[Example]):
|
|
# TODO
|
|
pass
|
|
|
|
def _get_aligned_spans(self, eg: Example):
|
|
return eg.get_aligned_spans_y2x(
|
|
eg.reference.spans.get(self.key, []), allow_overlap=True
|
|
)
|
|
|
|
def _make_span_group_multilabel(
|
|
self,
|
|
doc: Doc,
|
|
indices: Ints2d,
|
|
scores: Floats2d,
|
|
) -> SpanGroup:
|
|
"""Find the top-k labels for each span (k=max_positive)."""
|
|
spans = SpanGroup(doc, name=self.key)
|
|
if scores.size == 0:
|
|
return spans
|
|
scores = self.model.ops.to_numpy(scores)
|
|
indices = self.model.ops.to_numpy(indices)
|
|
threshold = self.cfg["threshold"]
|
|
max_positive = self.cfg["max_positive"]
|
|
|
|
keeps = scores >= threshold
|
|
if max_positive is not None:
|
|
assert isinstance(max_positive, int)
|
|
if self.add_negative_label:
|
|
negative_scores = numpy.copy(scores[:, self._negative_label_i])
|
|
scores[:, self._negative_label_i] = -numpy.inf
|
|
ranked = (scores * -1).argsort() # type: ignore
|
|
scores[:, self._negative_label_i] = negative_scores
|
|
else:
|
|
ranked = (scores * -1).argsort() # type: ignore
|
|
span_filter = ranked[:, max_positive:]
|
|
for i, row in enumerate(span_filter):
|
|
keeps[i, row] = False
|
|
|
|
attrs_scores = []
|
|
for i in range(indices.shape[0]):
|
|
start = indices[i, 0]
|
|
end = indices[i, 1]
|
|
for j, keep in enumerate(keeps[i]):
|
|
if keep:
|
|
if j != self._negative_label_i:
|
|
spans.append(Span(doc, start, end, label=self.labels[j]))
|
|
attrs_scores.append(scores[i, j])
|
|
spans.attrs["scores"] = numpy.array(attrs_scores)
|
|
return spans
|
|
|
|
def _make_span_group_singlelabel(
|
|
self,
|
|
doc: Doc,
|
|
indices: Ints2d,
|
|
scores: Floats2d,
|
|
allow_overlap: bool = True,
|
|
) -> SpanGroup:
|
|
"""Find the argmax label for each span."""
|
|
# Handle cases when there are zero suggestions
|
|
if scores.size == 0:
|
|
return SpanGroup(doc, name=self.key)
|
|
scores = self.model.ops.to_numpy(scores)
|
|
indices = self.model.ops.to_numpy(indices)
|
|
predicted = scores.argmax(axis=1)
|
|
argmax_scores = numpy.take_along_axis(
|
|
scores, numpy.expand_dims(predicted, 1), axis=1
|
|
)
|
|
keeps = numpy.ones(predicted.shape, dtype=bool)
|
|
# Remove samples where the negative label is the argmax.
|
|
if self.add_negative_label:
|
|
keeps = numpy.logical_and(keeps, predicted != self._negative_label_i)
|
|
# Filter samples according to threshold.
|
|
threshold = self.cfg["threshold"]
|
|
if threshold is not None:
|
|
keeps = numpy.logical_and(keeps, (argmax_scores >= threshold).squeeze())
|
|
# Sort spans according to argmax probability
|
|
if not allow_overlap:
|
|
# Get the probabilities
|
|
sort_idx = (argmax_scores.squeeze() * -1).argsort()
|
|
argmax_scores = argmax_scores[sort_idx]
|
|
predicted = predicted[sort_idx]
|
|
indices = indices[sort_idx]
|
|
keeps = keeps[sort_idx]
|
|
seen = _Intervals()
|
|
spans = SpanGroup(doc, name=self.key)
|
|
attrs_scores = []
|
|
for i in range(indices.shape[0]):
|
|
if not keeps[i]:
|
|
continue
|
|
|
|
label = predicted[i]
|
|
start = indices[i, 0]
|
|
end = indices[i, 1]
|
|
|
|
if not allow_overlap:
|
|
if (start, end) in seen:
|
|
continue
|
|
else:
|
|
seen.add(start, end)
|
|
attrs_scores.append(argmax_scores[i])
|
|
spans.append(Span(doc, start, end, label=self.labels[label]))
|
|
|
|
spans.attrs["scores"] = numpy.array(attrs_scores)
|
|
return spans
|
|
|
|
|
|
# Setup backwards compatibility hook for factories
|
|
def __getattr__(name):
|
|
if name == "make_spancat":
|
|
module = importlib.import_module("spacy.pipeline.factories")
|
|
return module.make_spancat
|
|
elif name == "make_spancat_singlelabel":
|
|
module = importlib.import_module("spacy.pipeline.factories")
|
|
return module.make_spancat_singlelabel
|
|
raise AttributeError(f"module {__name__} has no attribute {name}")
|