mirror of
https://github.com/explosion/spaCy.git
synced 2025-01-10 01:06:33 +03:00
c003aac29a
* span finder integrated into spacy from experimental * black * isort * black * default spankey constant * black * Update spacy/pipeline/spancat.py Co-authored-by: Adriane Boyd <adrianeboyd@gmail.com> * rename * rename * max_length and min_length as Optional[int] and strict checking * black * mypy fix for integer type infinity * revert line order * implement all comparison operators for inf int * avoid two for loops over all docs by not precomputing * interleave thresholding with span creation * black * revert to not interleaving (relized its faster) * black * Update spacy/errors.py Co-authored-by: Adriane Boyd <adrianeboyd@gmail.com> * update dosctring * enforce that the gold and predicted documents have the same text * new error for ensuring reference and predicted texts are the same * remove todo * adjust test * black * handle misaligned tokenization * return correct variable * failing overfit test * only use a single spans_key like in spancat * black * remove debug lines * typo * remove comment * remove near duplicate reduntant method * use the 'spans_key' variable name everywhere * Update spacy/pipeline/span_finder.py Co-authored-by: Adriane Boyd <adrianeboyd@gmail.com> * flaky test fix suggestion, hand set bias terms * only test suggester and test result exhaustively * make it clear that the span_finder_suggester is more general (not specific to span_finder) * Update spacy/tests/pipeline/test_span_finder.py Co-authored-by: Adriane Boyd <adrianeboyd@gmail.com> * Apply suggestions from code review * remove question comment * move preset_spans_suggester test to spancat tests * Add docs and unify default configs for spancat and span finder * Add `allow_overlap=True` to span finder scorer * Fix offset bug in set_annotations * Ignore labels in span finder scorer * Format * Add span_finder to quickstart template * Move settings to self.cfg, store min/max unset as None * Remove debugging * Update docstrings and docs * Update spacy/pipeline/span_finder.py Co-authored-by: Sofie Van Landeghem <svlandeg@users.noreply.github.com> * Fix imports --------- Co-authored-by: Adriane Boyd <adrianeboyd@gmail.com> Co-authored-by: Sofie Van Landeghem <svlandeg@users.noreply.github.com>
788 lines
29 KiB
Python
788 lines
29 KiB
Python
from dataclasses import dataclass
|
|
from functools import partial
|
|
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast
|
|
|
|
import numpy
|
|
from thinc.api import Config, Model, Ops, Optimizer, get_current_ops, set_dropout_rate
|
|
from thinc.types import Floats2d, Ints1d, Ints2d, Ragged
|
|
|
|
from ..compat import Protocol, runtime_checkable
|
|
from ..errors import Errors
|
|
from ..language import Language
|
|
from ..scorer import Scorer
|
|
from ..tokens import Doc, Span, SpanGroup
|
|
from ..training import Example, validate_examples
|
|
from ..util import registry
|
|
from ..vocab import Vocab
|
|
from .trainable_pipe import TrainablePipe
|
|
|
|
spancat_default_config = """
|
|
[model]
|
|
@architectures = "spacy.SpanCategorizer.v1"
|
|
scorer = {"@layers": "spacy.LinearLogistic.v1"}
|
|
|
|
[model.reducer]
|
|
@layers = spacy.mean_max_reducer.v1
|
|
hidden_size = 128
|
|
|
|
[model.tok2vec]
|
|
@architectures = "spacy.Tok2Vec.v2"
|
|
|
|
[model.tok2vec.embed]
|
|
@architectures = "spacy.MultiHashEmbed.v2"
|
|
width = 96
|
|
rows = [5000, 1000, 2500, 1000]
|
|
attrs = ["NORM", "PREFIX", "SUFFIX", "SHAPE"]
|
|
include_static_vectors = false
|
|
|
|
[model.tok2vec.encode]
|
|
@architectures = "spacy.MaxoutWindowEncoder.v2"
|
|
width = ${model.tok2vec.embed.width}
|
|
window_size = 1
|
|
maxout_pieces = 3
|
|
depth = 4
|
|
"""
|
|
|
|
spancat_singlelabel_default_config = """
|
|
[model]
|
|
@architectures = "spacy.SpanCategorizer.v1"
|
|
scorer = {"@layers": "Softmax.v2"}
|
|
|
|
[model.reducer]
|
|
@layers = spacy.mean_max_reducer.v1
|
|
hidden_size = 128
|
|
|
|
[model.tok2vec]
|
|
@architectures = "spacy.Tok2Vec.v2"
|
|
[model.tok2vec.embed]
|
|
@architectures = "spacy.MultiHashEmbed.v1"
|
|
width = 96
|
|
rows = [5000, 1000, 2500, 1000]
|
|
attrs = ["NORM", "PREFIX", "SUFFIX", "SHAPE"]
|
|
include_static_vectors = false
|
|
|
|
[model.tok2vec.encode]
|
|
@architectures = "spacy.MaxoutWindowEncoder.v2"
|
|
width = ${model.tok2vec.embed.width}
|
|
window_size = 1
|
|
maxout_pieces = 3
|
|
depth = 4
|
|
"""
|
|
|
|
DEFAULT_SPANS_KEY = "sc"
|
|
DEFAULT_SPANCAT_MODEL = Config().from_str(spancat_default_config)["model"]
|
|
DEFAULT_SPANCAT_SINGLELABEL_MODEL = Config().from_str(
|
|
spancat_singlelabel_default_config
|
|
)["model"]
|
|
|
|
|
|
@runtime_checkable
|
|
class Suggester(Protocol):
|
|
def __call__(self, docs: Iterable[Doc], *, ops: Optional[Ops] = None) -> Ragged:
|
|
...
|
|
|
|
|
|
def ngram_suggester(
|
|
docs: Iterable[Doc], sizes: List[int], *, ops: Optional[Ops] = None
|
|
) -> Ragged:
|
|
if ops is None:
|
|
ops = get_current_ops()
|
|
spans = []
|
|
lengths = []
|
|
for doc in docs:
|
|
starts = ops.xp.arange(len(doc), dtype="i")
|
|
starts = starts.reshape((-1, 1))
|
|
length = 0
|
|
for size in sizes:
|
|
if size <= len(doc):
|
|
starts_size = starts[: len(doc) - (size - 1)]
|
|
spans.append(ops.xp.hstack((starts_size, starts_size + size)))
|
|
length += spans[-1].shape[0]
|
|
if spans:
|
|
assert spans[-1].ndim == 2, spans[-1].shape
|
|
lengths.append(length)
|
|
lengths_array = ops.asarray1i(lengths)
|
|
if len(spans) > 0:
|
|
output = Ragged(ops.xp.vstack(spans), lengths_array)
|
|
else:
|
|
output = Ragged(ops.xp.zeros((0, 0), dtype="i"), lengths_array)
|
|
|
|
assert output.dataXd.ndim == 2
|
|
return output
|
|
|
|
|
|
def preset_spans_suggester(
|
|
docs: Iterable[Doc], spans_key: str, *, ops: Optional[Ops] = None
|
|
) -> Ragged:
|
|
if ops is None:
|
|
ops = get_current_ops()
|
|
spans = []
|
|
lengths = []
|
|
for doc in docs:
|
|
length = 0
|
|
if doc.spans[spans_key]:
|
|
for span in doc.spans[spans_key]:
|
|
spans.append([span.start, span.end])
|
|
length += 1
|
|
|
|
lengths.append(length)
|
|
lengths_array = cast(Ints1d, ops.asarray(lengths, dtype="i"))
|
|
if len(spans) > 0:
|
|
output = Ragged(ops.asarray(spans, dtype="i"), lengths_array)
|
|
else:
|
|
output = Ragged(ops.xp.zeros((0, 0), dtype="i"), lengths_array)
|
|
return output
|
|
|
|
|
|
@registry.misc("spacy.ngram_suggester.v1")
|
|
def build_ngram_suggester(sizes: List[int]) -> Suggester:
|
|
"""Suggest all spans of the given lengths. Spans are returned as a ragged
|
|
array of integers. The array has two columns, indicating the start and end
|
|
position."""
|
|
|
|
return partial(ngram_suggester, sizes=sizes)
|
|
|
|
|
|
@registry.misc("spacy.ngram_range_suggester.v1")
|
|
def build_ngram_range_suggester(min_size: int, max_size: int) -> Suggester:
|
|
"""Suggest all spans of the given lengths between a given min and max value - both inclusive.
|
|
Spans are returned as a ragged array of integers. The array has two columns,
|
|
indicating the start and end position."""
|
|
sizes = list(range(min_size, max_size + 1))
|
|
return build_ngram_suggester(sizes)
|
|
|
|
|
|
@registry.misc("spacy.preset_spans_suggester.v1")
|
|
def build_preset_spans_suggester(spans_key: str) -> Suggester:
|
|
"""Suggest all spans that are already stored in doc.spans[spans_key].
|
|
This is useful when an upstream component is used to set the spans
|
|
on the Doc such as a SpanRuler or SpanFinder."""
|
|
return partial(preset_spans_suggester, spans_key=spans_key)
|
|
|
|
|
|
@Language.factory(
|
|
"spancat",
|
|
assigns=["doc.spans"],
|
|
default_config={
|
|
"threshold": 0.5,
|
|
"spans_key": DEFAULT_SPANS_KEY,
|
|
"max_positive": None,
|
|
"model": DEFAULT_SPANCAT_MODEL,
|
|
"suggester": {"@misc": "spacy.ngram_suggester.v1", "sizes": [1, 2, 3]},
|
|
"scorer": {"@scorers": "spacy.spancat_scorer.v1"},
|
|
},
|
|
default_score_weights={"spans_sc_f": 1.0, "spans_sc_p": 0.0, "spans_sc_r": 0.0},
|
|
)
|
|
def make_spancat(
|
|
nlp: Language,
|
|
name: str,
|
|
suggester: Suggester,
|
|
model: Model[Tuple[List[Doc], Ragged], Floats2d],
|
|
spans_key: str,
|
|
scorer: Optional[Callable],
|
|
threshold: float,
|
|
max_positive: Optional[int],
|
|
) -> "SpanCategorizer":
|
|
"""Create a SpanCategorizer component and configure it for multi-label
|
|
classification to be able to assign multiple labels for each span.
|
|
The span categorizer consists of two
|
|
parts: a suggester function that proposes candidate spans, and a labeller
|
|
model that predicts one or more labels for each span.
|
|
|
|
name (str): The component instance name, used to add entries to the
|
|
losses during training.
|
|
suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
|
|
Spans are returned as a ragged array with two integer columns, for the
|
|
start and end positions.
|
|
model (Model[Tuple[List[Doc], Ragged], Floats2d]): A model instance that
|
|
is given a list of documents and (start, end) indices representing
|
|
candidate span offsets. The model predicts a probability for each category
|
|
for each span.
|
|
spans_key (str): Key of the doc.spans dict to save the spans under. During
|
|
initialization and training, the component will look for spans on the
|
|
reference document under the same key.
|
|
scorer (Optional[Callable]): The scoring method. Defaults to
|
|
Scorer.score_spans for the Doc.spans[spans_key] with overlapping
|
|
spans allowed.
|
|
threshold (float): Minimum probability to consider a prediction positive.
|
|
Spans with a positive prediction will be saved on the Doc. Defaults to
|
|
0.5.
|
|
max_positive (Optional[int]): Maximum number of labels to consider positive
|
|
per span. Defaults to None, indicating no limit.
|
|
"""
|
|
return SpanCategorizer(
|
|
nlp.vocab,
|
|
model=model,
|
|
suggester=suggester,
|
|
name=name,
|
|
spans_key=spans_key,
|
|
negative_weight=None,
|
|
allow_overlap=True,
|
|
max_positive=max_positive,
|
|
threshold=threshold,
|
|
scorer=scorer,
|
|
add_negative_label=False,
|
|
)
|
|
|
|
|
|
@Language.factory(
|
|
"spancat_singlelabel",
|
|
assigns=["doc.spans"],
|
|
default_config={
|
|
"spans_key": DEFAULT_SPANS_KEY,
|
|
"model": DEFAULT_SPANCAT_SINGLELABEL_MODEL,
|
|
"negative_weight": 1.0,
|
|
"suggester": {"@misc": "spacy.ngram_suggester.v1", "sizes": [1, 2, 3]},
|
|
"scorer": {"@scorers": "spacy.spancat_scorer.v1"},
|
|
"allow_overlap": True,
|
|
},
|
|
default_score_weights={"spans_sc_f": 1.0, "spans_sc_p": 0.0, "spans_sc_r": 0.0},
|
|
)
|
|
def make_spancat_singlelabel(
|
|
nlp: Language,
|
|
name: str,
|
|
suggester: Suggester,
|
|
model: Model[Tuple[List[Doc], Ragged], Floats2d],
|
|
spans_key: str,
|
|
negative_weight: float,
|
|
allow_overlap: bool,
|
|
scorer: Optional[Callable],
|
|
) -> "SpanCategorizer":
|
|
"""Create a SpanCategorizer component and configure it for multi-class
|
|
classification. With this configuration each span can get at most one
|
|
label. The span categorizer consists of two
|
|
parts: a suggester function that proposes candidate spans, and a labeller
|
|
model that predicts one or more labels for each span.
|
|
|
|
name (str): The component instance name, used to add entries to the
|
|
losses during training.
|
|
suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
|
|
Spans are returned as a ragged array with two integer columns, for the
|
|
start and end positions.
|
|
model (Model[Tuple[List[Doc], Ragged], Floats2d]): A model instance that
|
|
is given a list of documents and (start, end) indices representing
|
|
candidate span offsets. The model predicts a probability for each category
|
|
for each span.
|
|
spans_key (str): Key of the doc.spans dict to save the spans under. During
|
|
initialization and training, the component will look for spans on the
|
|
reference document under the same key.
|
|
scorer (Optional[Callable]): The scoring method. Defaults to
|
|
Scorer.score_spans for the Doc.spans[spans_key] with overlapping
|
|
spans allowed.
|
|
negative_weight (float): Multiplier for the loss terms.
|
|
Can be used to downweight the negative samples if there are too many.
|
|
allow_overlap (bool): If True the data is assumed to contain overlapping spans.
|
|
Otherwise it produces non-overlapping spans greedily prioritizing
|
|
higher assigned label scores.
|
|
"""
|
|
return SpanCategorizer(
|
|
nlp.vocab,
|
|
model=model,
|
|
suggester=suggester,
|
|
name=name,
|
|
spans_key=spans_key,
|
|
negative_weight=negative_weight,
|
|
allow_overlap=allow_overlap,
|
|
max_positive=1,
|
|
add_negative_label=True,
|
|
threshold=None,
|
|
scorer=scorer,
|
|
)
|
|
|
|
|
|
def spancat_score(examples: Iterable[Example], **kwargs) -> Dict[str, Any]:
|
|
kwargs = dict(kwargs)
|
|
attr_prefix = "spans_"
|
|
key = kwargs["spans_key"]
|
|
kwargs.setdefault("attr", f"{attr_prefix}{key}")
|
|
kwargs.setdefault("allow_overlap", True)
|
|
kwargs.setdefault(
|
|
"getter", lambda doc, key: doc.spans.get(key[len(attr_prefix) :], [])
|
|
)
|
|
kwargs.setdefault("has_annotation", lambda doc: key in doc.spans)
|
|
return Scorer.score_spans(examples, **kwargs)
|
|
|
|
|
|
@registry.scorers("spacy.spancat_scorer.v1")
|
|
def make_spancat_scorer():
|
|
return spancat_score
|
|
|
|
|
|
@dataclass
|
|
class _Intervals:
|
|
"""
|
|
Helper class to avoid storing overlapping spans.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.ranges = set()
|
|
|
|
def add(self, i, j):
|
|
for e in range(i, j):
|
|
self.ranges.add(e)
|
|
|
|
def __contains__(self, rang):
|
|
i, j = rang
|
|
for e in range(i, j):
|
|
if e in self.ranges:
|
|
return True
|
|
return False
|
|
|
|
|
|
class SpanCategorizer(TrainablePipe):
|
|
"""Pipeline component to label spans of text.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
vocab: Vocab,
|
|
model: Model[Tuple[List[Doc], Ragged], Floats2d],
|
|
suggester: Suggester,
|
|
name: str = "spancat",
|
|
*,
|
|
add_negative_label: bool = False,
|
|
spans_key: str = "spans",
|
|
negative_weight: Optional[float] = 1.0,
|
|
allow_overlap: Optional[bool] = True,
|
|
max_positive: Optional[int] = None,
|
|
threshold: Optional[float] = 0.5,
|
|
scorer: Optional[Callable] = spancat_score,
|
|
) -> None:
|
|
"""Initialize the multi-label or multi-class span categorizer.
|
|
|
|
vocab (Vocab): The shared vocabulary.
|
|
model (thinc.api.Model): The Thinc Model powering the pipeline component.
|
|
For multi-class classification (single label per span) we recommend
|
|
using a Softmax classifier as a the final layer, while for multi-label
|
|
classification (multiple possible labels per span) we recommend Logistic.
|
|
suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
|
|
Spans are returned as a ragged array with two integer columns, for the
|
|
start and end positions.
|
|
name (str): The component instance name, used to add entries to the
|
|
losses during training.
|
|
spans_key (str): Key of the Doc.spans dict to save the spans under.
|
|
During initialization and training, the component will look for
|
|
spans on the reference document under the same key. Defaults to
|
|
`"spans"`.
|
|
add_negative_label (bool): Learn to predict a special 'negative_label'
|
|
when a Span is not annotated.
|
|
threshold (Optional[float]): Minimum probability to consider a prediction
|
|
positive. Defaults to 0.5. Spans with a positive prediction will be saved
|
|
on the Doc.
|
|
max_positive (Optional[int]): Maximum number of labels to consider
|
|
positive per span. Defaults to None, indicating no limit.
|
|
negative_weight (float): Multiplier for the loss terms.
|
|
Can be used to downweight the negative samples if there are too many
|
|
when add_negative_label is True. Otherwise its unused.
|
|
allow_overlap (bool): If True the data is assumed to contain overlapping spans.
|
|
Otherwise it produces non-overlapping spans greedily prioritizing
|
|
higher assigned label scores. Only used when max_positive is 1.
|
|
scorer (Optional[Callable]): The scoring method. Defaults to
|
|
Scorer.score_spans for the Doc.spans[spans_key] with overlapping
|
|
spans allowed.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#init
|
|
"""
|
|
self.cfg = {
|
|
"labels": [],
|
|
"spans_key": spans_key,
|
|
"threshold": threshold,
|
|
"max_positive": max_positive,
|
|
"negative_weight": negative_weight,
|
|
"allow_overlap": allow_overlap,
|
|
}
|
|
self.vocab = vocab
|
|
self.suggester = suggester
|
|
self.model = model
|
|
self.name = name
|
|
self.scorer = scorer
|
|
self.add_negative_label = add_negative_label
|
|
if not allow_overlap and max_positive is not None and max_positive > 1:
|
|
raise ValueError(Errors.E1051.format(max_positive=max_positive))
|
|
|
|
@property
|
|
def key(self) -> str:
|
|
"""Key of the doc.spans dict to save the spans under. During
|
|
initialization and training, the component will look for spans on the
|
|
reference document under the same key.
|
|
"""
|
|
return str(self.cfg["spans_key"])
|
|
|
|
def _allow_extra_label(self) -> None:
|
|
"""Raise an error if the component can not add any more labels."""
|
|
nO = None
|
|
if self.model.has_dim("nO"):
|
|
nO = self.model.get_dim("nO")
|
|
elif self.model.has_ref("output_layer") and self.model.get_ref(
|
|
"output_layer"
|
|
).has_dim("nO"):
|
|
nO = self.model.get_ref("output_layer").get_dim("nO")
|
|
if nO is not None and nO == self._n_labels:
|
|
if not self.is_resizable:
|
|
raise ValueError(
|
|
Errors.E922.format(name=self.name, nO=self.model.get_dim("nO"))
|
|
)
|
|
|
|
def add_label(self, label: str) -> int:
|
|
"""Add a new label to the pipe.
|
|
|
|
label (str): The label to add.
|
|
RETURNS (int): 0 if label is already present, otherwise 1.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#add_label
|
|
"""
|
|
if not isinstance(label, str):
|
|
raise ValueError(Errors.E187)
|
|
if label in self.labels:
|
|
return 0
|
|
self._allow_extra_label()
|
|
self.cfg["labels"].append(label) # type: ignore
|
|
self.vocab.strings.add(label)
|
|
return 1
|
|
|
|
@property
|
|
def labels(self) -> Tuple[str]:
|
|
"""RETURNS (Tuple[str]): The labels currently added to the component.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#labels
|
|
"""
|
|
return tuple(self.cfg["labels"]) # type: ignore
|
|
|
|
@property
|
|
def label_data(self) -> List[str]:
|
|
"""RETURNS (List[str]): Information about the component's labels.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#label_data
|
|
"""
|
|
return list(self.labels)
|
|
|
|
@property
|
|
def _label_map(self) -> Dict[str, int]:
|
|
"""RETURNS (Dict[str, int]): The label map."""
|
|
return {label: i for i, label in enumerate(self.labels)}
|
|
|
|
@property
|
|
def _n_labels(self) -> int:
|
|
"""RETURNS (int): Number of labels."""
|
|
if self.add_negative_label:
|
|
return len(self.labels) + 1
|
|
else:
|
|
return len(self.labels)
|
|
|
|
@property
|
|
def _negative_label_i(self) -> Union[int, None]:
|
|
"""RETURNS (Union[int, None]): Index of the negative label."""
|
|
if self.add_negative_label:
|
|
return len(self.label_data)
|
|
else:
|
|
return None
|
|
|
|
def predict(self, docs: Iterable[Doc]):
|
|
"""Apply the pipeline's model to a batch of docs, without modifying them.
|
|
|
|
docs (Iterable[Doc]): The documents to predict.
|
|
RETURNS: The models prediction for each document.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#predict
|
|
"""
|
|
indices = self.suggester(docs, ops=self.model.ops)
|
|
if indices.lengths.sum() == 0:
|
|
scores = self.model.ops.alloc2f(0, 0)
|
|
else:
|
|
scores = self.model.predict((docs, indices)) # type: ignore
|
|
return indices, scores
|
|
|
|
def set_candidates(
|
|
self, docs: Iterable[Doc], *, candidates_key: str = "candidates"
|
|
) -> None:
|
|
"""Use the spancat suggester to add a list of span candidates to a list of docs.
|
|
This method is intended to be used for debugging purposes.
|
|
|
|
docs (Iterable[Doc]): The documents to modify.
|
|
candidates_key (str): Key of the Doc.spans dict to save the candidate spans under.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#set_candidates
|
|
"""
|
|
suggester_output = self.suggester(docs, ops=self.model.ops)
|
|
|
|
for candidates, doc in zip(suggester_output, docs): # type: ignore
|
|
doc.spans[candidates_key] = []
|
|
for index in candidates.dataXd:
|
|
doc.spans[candidates_key].append(doc[index[0] : index[1]])
|
|
|
|
def set_annotations(self, docs: Iterable[Doc], indices_scores) -> None:
|
|
"""Modify a batch of Doc objects, using pre-computed scores.
|
|
|
|
docs (Iterable[Doc]): The documents to modify.
|
|
scores: The scores to set, produced by SpanCategorizer.predict.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#set_annotations
|
|
"""
|
|
indices, scores = indices_scores
|
|
offset = 0
|
|
for i, doc in enumerate(docs):
|
|
indices_i = indices[i].dataXd
|
|
allow_overlap = cast(bool, self.cfg["allow_overlap"])
|
|
if self.cfg["max_positive"] == 1:
|
|
doc.spans[self.key] = self._make_span_group_singlelabel(
|
|
doc,
|
|
indices_i,
|
|
scores[offset : offset + indices.lengths[i]],
|
|
allow_overlap,
|
|
)
|
|
else:
|
|
doc.spans[self.key] = self._make_span_group_multilabel(
|
|
doc,
|
|
indices_i,
|
|
scores[offset : offset + indices.lengths[i]],
|
|
)
|
|
offset += indices.lengths[i]
|
|
|
|
def update(
|
|
self,
|
|
examples: Iterable[Example],
|
|
*,
|
|
drop: float = 0.0,
|
|
sgd: Optional[Optimizer] = None,
|
|
losses: Optional[Dict[str, float]] = None,
|
|
) -> Dict[str, float]:
|
|
"""Learn from a batch of documents and gold-standard information,
|
|
updating the pipe's model. Delegates to predict and get_loss.
|
|
|
|
examples (Iterable[Example]): A batch of Example objects.
|
|
drop (float): The dropout rate.
|
|
sgd (thinc.api.Optimizer): The optimizer.
|
|
losses (Dict[str, float]): Optional record of the loss during training.
|
|
Updated using the component name as the key.
|
|
RETURNS (Dict[str, float]): The updated losses dictionary.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#update
|
|
"""
|
|
if losses is None:
|
|
losses = {}
|
|
losses.setdefault(self.name, 0.0)
|
|
validate_examples(examples, "SpanCategorizer.update")
|
|
self._validate_categories(examples)
|
|
if not any(len(eg.predicted) if eg.predicted else 0 for eg in examples):
|
|
# Handle cases where there are no tokens in any docs.
|
|
return losses
|
|
docs = [eg.predicted for eg in examples]
|
|
spans = self.suggester(docs, ops=self.model.ops)
|
|
if spans.lengths.sum() == 0:
|
|
return losses
|
|
set_dropout_rate(self.model, drop)
|
|
scores, backprop_scores = self.model.begin_update((docs, spans))
|
|
loss, d_scores = self.get_loss(examples, (spans, scores))
|
|
backprop_scores(d_scores) # type: ignore
|
|
if sgd is not None:
|
|
self.finish_update(sgd)
|
|
losses[self.name] += loss
|
|
return losses
|
|
|
|
def get_loss(
|
|
self, examples: Iterable[Example], spans_scores: Tuple[Ragged, Floats2d]
|
|
) -> Tuple[float, float]:
|
|
"""Find the loss and gradient of loss for the batch of documents and
|
|
their predicted scores.
|
|
|
|
examples (Iterable[Examples]): The batch of examples.
|
|
spans_scores: Scores representing the model's predictions.
|
|
RETURNS (Tuple[float, float]): The loss and the gradient.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#get_loss
|
|
"""
|
|
spans, scores = spans_scores
|
|
spans = Ragged(
|
|
self.model.ops.to_numpy(spans.data), self.model.ops.to_numpy(spans.lengths)
|
|
)
|
|
target = numpy.zeros(scores.shape, dtype=scores.dtype)
|
|
if self.add_negative_label:
|
|
negative_spans = numpy.ones((scores.shape[0]))
|
|
offset = 0
|
|
label_map = self._label_map
|
|
for i, eg in enumerate(examples):
|
|
# Map (start, end) offset of spans to the row in the d_scores array,
|
|
# so that we can adjust the gradient for predictions that were
|
|
# in the gold standard.
|
|
spans_index = {}
|
|
spans_i = spans[i].dataXd
|
|
for j in range(spans.lengths[i]):
|
|
start = int(spans_i[j, 0]) # type: ignore
|
|
end = int(spans_i[j, 1]) # type: ignore
|
|
spans_index[(start, end)] = offset + j
|
|
for gold_span in self._get_aligned_spans(eg):
|
|
key = (gold_span.start, gold_span.end)
|
|
if key in spans_index:
|
|
row = spans_index[key]
|
|
k = label_map[gold_span.label_]
|
|
target[row, k] = 1.0
|
|
if self.add_negative_label:
|
|
# delete negative label target.
|
|
negative_spans[row] = 0.0
|
|
# The target is a flat array for all docs. Track the position
|
|
# we're at within the flat array.
|
|
offset += spans.lengths[i]
|
|
target = self.model.ops.asarray(target, dtype="f") # type: ignore
|
|
if self.add_negative_label:
|
|
negative_samples = numpy.nonzero(negative_spans)[0]
|
|
target[negative_samples, self._negative_label_i] = 1.0 # type: ignore
|
|
# The target will have the values 0 (for untrue predictions) or 1
|
|
# (for true predictions).
|
|
# The scores should be in the range [0, 1].
|
|
# If the prediction is 0.9 and it's true, the gradient
|
|
# will be -0.1 (0.9 - 1.0).
|
|
# If the prediction is 0.9 and it's false, the gradient will be
|
|
# 0.9 (0.9 - 0.0)
|
|
d_scores = scores - target
|
|
if self.add_negative_label:
|
|
neg_weight = cast(float, self.cfg["negative_weight"])
|
|
if neg_weight != 1.0:
|
|
d_scores[negative_samples] *= neg_weight
|
|
loss = float((d_scores**2).sum())
|
|
return loss, d_scores
|
|
|
|
def initialize(
|
|
self,
|
|
get_examples: Callable[[], Iterable[Example]],
|
|
*,
|
|
nlp: Optional[Language] = None,
|
|
labels: Optional[List[str]] = None,
|
|
) -> None:
|
|
"""Initialize the pipe for training, using a representative set
|
|
of data examples.
|
|
|
|
get_examples (Callable[[], Iterable[Example]]): Function that
|
|
returns a representative sample of gold-standard Example objects.
|
|
nlp (Optional[Language]): The current nlp object the component is part of.
|
|
labels (Optional[List[str]]): The labels to add to the component, typically generated by the
|
|
`init labels` command. If no labels are provided, the get_examples
|
|
callback is used to extract the labels from the data.
|
|
|
|
DOCS: https://spacy.io/api/spancategorizer#initialize
|
|
"""
|
|
subbatch: List[Example] = []
|
|
if labels is not None:
|
|
for label in labels:
|
|
self.add_label(label)
|
|
for eg in get_examples():
|
|
if labels is None:
|
|
for span in eg.reference.spans.get(self.key, []):
|
|
self.add_label(span.label_)
|
|
if len(subbatch) < 10:
|
|
subbatch.append(eg)
|
|
self._require_labels()
|
|
if subbatch:
|
|
docs = [eg.x for eg in subbatch]
|
|
spans = build_ngram_suggester(sizes=[1])(docs)
|
|
Y = self.model.ops.alloc2f(spans.dataXd.shape[0], self._n_labels)
|
|
self.model.initialize(X=(docs, spans), Y=Y)
|
|
else:
|
|
self.model.initialize()
|
|
|
|
def _validate_categories(self, examples: Iterable[Example]):
|
|
# TODO
|
|
pass
|
|
|
|
def _get_aligned_spans(self, eg: Example):
|
|
return eg.get_aligned_spans_y2x(
|
|
eg.reference.spans.get(self.key, []), allow_overlap=True
|
|
)
|
|
|
|
def _make_span_group_multilabel(
|
|
self,
|
|
doc: Doc,
|
|
indices: Ints2d,
|
|
scores: Floats2d,
|
|
) -> SpanGroup:
|
|
"""Find the top-k labels for each span (k=max_positive)."""
|
|
spans = SpanGroup(doc, name=self.key)
|
|
if scores.size == 0:
|
|
return spans
|
|
scores = self.model.ops.to_numpy(scores)
|
|
indices = self.model.ops.to_numpy(indices)
|
|
threshold = self.cfg["threshold"]
|
|
max_positive = self.cfg["max_positive"]
|
|
|
|
keeps = scores >= threshold
|
|
if max_positive is not None:
|
|
assert isinstance(max_positive, int)
|
|
if self.add_negative_label:
|
|
negative_scores = numpy.copy(scores[:, self._negative_label_i])
|
|
scores[:, self._negative_label_i] = -numpy.inf
|
|
ranked = (scores * -1).argsort() # type: ignore
|
|
scores[:, self._negative_label_i] = negative_scores
|
|
else:
|
|
ranked = (scores * -1).argsort() # type: ignore
|
|
span_filter = ranked[:, max_positive:]
|
|
for i, row in enumerate(span_filter):
|
|
keeps[i, row] = False
|
|
|
|
attrs_scores = []
|
|
for i in range(indices.shape[0]):
|
|
start = indices[i, 0]
|
|
end = indices[i, 1]
|
|
for j, keep in enumerate(keeps[i]):
|
|
if keep:
|
|
if j != self._negative_label_i:
|
|
spans.append(Span(doc, start, end, label=self.labels[j]))
|
|
attrs_scores.append(scores[i, j])
|
|
spans.attrs["scores"] = numpy.array(attrs_scores)
|
|
return spans
|
|
|
|
def _make_span_group_singlelabel(
|
|
self,
|
|
doc: Doc,
|
|
indices: Ints2d,
|
|
scores: Floats2d,
|
|
allow_overlap: bool = True,
|
|
) -> SpanGroup:
|
|
"""Find the argmax label for each span."""
|
|
# Handle cases when there are zero suggestions
|
|
if scores.size == 0:
|
|
return SpanGroup(doc, name=self.key)
|
|
scores = self.model.ops.to_numpy(scores)
|
|
indices = self.model.ops.to_numpy(indices)
|
|
predicted = scores.argmax(axis=1)
|
|
argmax_scores = numpy.take_along_axis(
|
|
scores, numpy.expand_dims(predicted, 1), axis=1
|
|
)
|
|
keeps = numpy.ones(predicted.shape, dtype=bool)
|
|
# Remove samples where the negative label is the argmax.
|
|
if self.add_negative_label:
|
|
keeps = numpy.logical_and(keeps, predicted != self._negative_label_i)
|
|
# Filter samples according to threshold.
|
|
threshold = self.cfg["threshold"]
|
|
if threshold is not None:
|
|
keeps = numpy.logical_and(keeps, (argmax_scores >= threshold).squeeze())
|
|
# Sort spans according to argmax probability
|
|
if not allow_overlap:
|
|
# Get the probabilities
|
|
sort_idx = (argmax_scores.squeeze() * -1).argsort()
|
|
argmax_scores = argmax_scores[sort_idx]
|
|
predicted = predicted[sort_idx]
|
|
indices = indices[sort_idx]
|
|
keeps = keeps[sort_idx]
|
|
seen = _Intervals()
|
|
spans = SpanGroup(doc, name=self.key)
|
|
attrs_scores = []
|
|
for i in range(indices.shape[0]):
|
|
if not keeps[i]:
|
|
continue
|
|
|
|
label = predicted[i]
|
|
start = indices[i, 0]
|
|
end = indices[i, 1]
|
|
|
|
if not allow_overlap:
|
|
if (start, end) in seen:
|
|
continue
|
|
else:
|
|
seen.add(start, end)
|
|
attrs_scores.append(argmax_scores[i])
|
|
spans.append(Span(doc, start, end, label=self.labels[label]))
|
|
|
|
spans.attrs["scores"] = numpy.array(attrs_scores)
|
|
return spans
|