mirror of
https://github.com/explosion/spaCy.git
synced 2025-04-25 03:13:41 +03:00
Add initial port
This commit is contained in:
parent
e7e845b5ed
commit
6f08d83731
|
@ -1,19 +1,20 @@
|
|||
from typing import List, Dict, Callable, Tuple, Optional, Iterable, Any, cast
|
||||
from thinc.api import Config, Model, get_current_ops, set_dropout_rate, Ops
|
||||
from thinc.api import Optimizer, Softmax_v2
|
||||
from thinc.types import Ragged, Ints2d, Floats2d, Ints1d
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, cast
|
||||
|
||||
import numpy
|
||||
from thinc.api import Config, Model, Ops, Optimizer, Softmax_v2
|
||||
from thinc.api import get_current_ops, set_dropout_rate
|
||||
from thinc.types import Floats2d, Ints1d, Ints2d, Ragged
|
||||
|
||||
from ..compat import Protocol, runtime_checkable
|
||||
from ..scorer import Scorer
|
||||
from ..language import Language
|
||||
from .trainable_pipe import TrainablePipe
|
||||
from ..tokens import Doc, SpanGroup, Span
|
||||
from ..vocab import Vocab
|
||||
from ..training import Example, validate_examples
|
||||
from ..errors import Errors
|
||||
from ..language import Language
|
||||
from ..tokens import Doc, Span, SpanGroup
|
||||
from ..training import Example, validate_examples
|
||||
from ..util import registry
|
||||
from ..vocab import Vocab
|
||||
from .spancat import spancat_score
|
||||
from .trainable_pipe import TrainablePipe
|
||||
|
||||
|
||||
@registry.layers("spacy.Softmax.v1")
|
||||
|
@ -68,11 +69,386 @@ class Suggester(Protocol):
|
|||
"suggester": {"@misc": "spacy.ngram_suggester.v1", "sizes": [1, 2, 3]},
|
||||
"scorer": {"@scorers": "spacy.spancat_scorer.v1"},
|
||||
},
|
||||
default_score_weights={"spans_sc_f": 1.0, "spans_sc_p": 0.0, "spans_sc_r": 0.0},
|
||||
)
|
||||
def make_spancat(
|
||||
nlp: Language,
|
||||
name: str,
|
||||
suggester: Suggester,
|
||||
model: Model[Tuple[List[Doc], Ragged], Floats2d],
|
||||
):
|
||||
pass
|
||||
spans_key: str,
|
||||
scorer: Optional[Callable],
|
||||
negative_weight: Optional[float] = 1.0,
|
||||
allow_overlap: Optional[bool] = True,
|
||||
) -> "SpanCategorizerExclusive":
|
||||
"""Create a SpanCategorizer component. The span categorizer consists of two
|
||||
parts: a suggester function that proposes candidate spans, and a labeller
|
||||
model that predicts one or more labels for each span.
|
||||
|
||||
suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
|
||||
Spans are returned as a ragged array with two integer columns, for the
|
||||
start and end positions.
|
||||
model (Model[Tuple[List[Doc], Ragged], Floats2d]): A model instance that
|
||||
is given a list of documents and (start, end) indices representing
|
||||
candidate span offsets. The model predicts a probability for each category
|
||||
for each span.
|
||||
spans_key (str): Key of the doc.spans dict to save the spans under. During
|
||||
initialization and training, the component will look for spans on the
|
||||
reference document under the same key.
|
||||
negative_weight (optional[float]): Multiplier for the loss terms.
|
||||
Can be used to down weigh the negative samples if there are too many.
|
||||
allow_overlap (Optional[bool]): If True the data is assumed to
|
||||
contain overlapping spans.
|
||||
"""
|
||||
return SpanCategorizerExclusive(
|
||||
nlp.vocab,
|
||||
suggester=suggester,
|
||||
model=model,
|
||||
spans_key=spans_key,
|
||||
negative_weight=negative_weight,
|
||||
name=name,
|
||||
scorer=scorer,
|
||||
allow_overlap=allow_overlap,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Ranges:
|
||||
"""
|
||||
Helper class help avoid storing overlapping span.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.ranges = set()
|
||||
|
||||
def add(self, i, j):
|
||||
for e in range(i, j):
|
||||
self.ranges.add(e)
|
||||
|
||||
def __contains__(self, rang):
|
||||
i, j = rang
|
||||
for e in range(i, j):
|
||||
if e in self.ranges:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class SpanCategorizerExclusive(TrainablePipe):
|
||||
"""Pipeline component to label spans of text.
|
||||
DOCS: https://spacy.io/api/spancategorizer
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
vocab: Vocab,
|
||||
model: Model[Tuple[List[Doc], Ragged], Floats2d],
|
||||
suggester: Suggester,
|
||||
name: str = "spancat_exclusive",
|
||||
*,
|
||||
spans_key: str = "spans",
|
||||
negative_weight: Optional[float],
|
||||
scorer: Optional[Callable] = spancat_score,
|
||||
allow_overlap: Optional[bool] = True,
|
||||
) -> None:
|
||||
"""Initialize the span categorizer.
|
||||
vocab (Vocab): The shared vocabulary.
|
||||
model (thinc.api.Model): The Thinc Model powering the pipeline component.
|
||||
name (str): The component instance name, used to add entries to the
|
||||
losses during training.
|
||||
spans_key (str): Key of the Doc.spans dict to save the spans under.
|
||||
During initialization and training, the component will look for
|
||||
spans on the reference document under the same key. Defaults to
|
||||
`"spans"`.
|
||||
negative_weight (optional[float]): Multiplier for the loss terms.
|
||||
Can be used to down weigh the negative samples if there are too many.
|
||||
scorer (Optional[Callable]): The scoring method. Defaults to
|
||||
allow_overlap (Optional[bool]): If True the data is assumed to
|
||||
contains overlapping spans.
|
||||
Scorer.score_spans for the Doc.spans[spans_key] with overlapping
|
||||
spans allowed.
|
||||
DOCS: https://spacy.io/api/spancategorizer#init
|
||||
"""
|
||||
self.cfg = {
|
||||
"labels": [],
|
||||
"spans_key": spans_key,
|
||||
"negative_weight": negative_weight,
|
||||
"allow_overlap": allow_overlap,
|
||||
}
|
||||
self.vocab = vocab
|
||||
self.suggester = suggester
|
||||
self.model = model
|
||||
self.name = name
|
||||
self.scorer = scorer
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
"""Key of the doc.spans dict to save the spans under. During
|
||||
initialization and training, the component will look for spans on the
|
||||
reference document under the same key.
|
||||
"""
|
||||
return str(self.cfg["spans_key"])
|
||||
|
||||
def add_label(self, label: str) -> int:
|
||||
"""Add a new label to the pipe.
|
||||
label (str): The label to add.
|
||||
RETURNS (int): 0 if label is already present, otherwise 1.
|
||||
DOCS: https://spacy.io/api/spancategorizer#add_label
|
||||
"""
|
||||
if not isinstance(label, str):
|
||||
raise ValueError(Errors.E187)
|
||||
if label in self.labels:
|
||||
return 0
|
||||
self._allow_extra_label()
|
||||
self.cfg["labels"].append(label) # type: ignore
|
||||
self.vocab.strings.add(label)
|
||||
return 1
|
||||
|
||||
@property
|
||||
def labels(self) -> Tuple[str]:
|
||||
"""RETURNS (Tuple[str]): The labels currently added to the component.
|
||||
DOCS: https://spacy.io/api/spancategorizer#labels
|
||||
"""
|
||||
return tuple(self.cfg["labels"]) # type: ignore
|
||||
|
||||
@property
|
||||
def label_data(self) -> List[str]:
|
||||
"""RETURNS (List[str]): Information about the component's labels.
|
||||
DOCS: https://spacy.io/api/spancategorizer#label_data
|
||||
"""
|
||||
return list(self.labels)
|
||||
|
||||
@property
|
||||
def _negative_label(self):
|
||||
"""
|
||||
Index of the negative label.
|
||||
"""
|
||||
return len(self.label_data)
|
||||
|
||||
@property
|
||||
def _n_labels(self):
|
||||
"""
|
||||
Number of labels including the negative label.
|
||||
"""
|
||||
return len(self.label_data) + 1
|
||||
|
||||
def predict(self, docs: Iterable[Doc]):
|
||||
"""Apply the pipeline's model to a batch of docs, without modifying them.
|
||||
docs (Iterable[Doc]): The documents to predict.
|
||||
RETURNS: The models prediction for each document.
|
||||
DOCS: https://spacy.io/api/spancategorizer#predict
|
||||
"""
|
||||
indices = self.suggester(docs, ops=self.model.ops)
|
||||
scores = self.model.predict((docs, indices)) # type: ignore
|
||||
return indices, scores
|
||||
|
||||
def set_candidates(
|
||||
self, docs: Iterable[Doc], *, candidates_key: str = "candidates"
|
||||
) -> None:
|
||||
"""Use the spancat suggester to add a list of span candidates to a
|
||||
list of docs. Intended to be used for debugging purposes.
|
||||
docs (Iterable[Doc]): The documents to modify.
|
||||
candidates_key (str): Key of the Doc.spans dict to save the
|
||||
candidate spans under.
|
||||
DOCS: https://spacy.io/api/spancategorizer#set_candidates
|
||||
"""
|
||||
suggester_output = self.suggester(docs, ops=self.model.ops)
|
||||
|
||||
for candidates, doc in zip(suggester_output, docs): # type: ignore
|
||||
doc.spans[candidates_key] = []
|
||||
for index in candidates.dataXd:
|
||||
doc.spans[candidates_key].append(doc[index[0] : index[1]])
|
||||
|
||||
def set_annotations(self, docs: Iterable[Doc], indices_scores) -> None:
|
||||
"""Modify a batch of Doc objects, using pre-computed scores.
|
||||
docs (Iterable[Doc]): The documents to modify.
|
||||
scores: The scores to set, produced by SpanCategorizer.predict.
|
||||
DOCS: https://spacy.io/api/spancategorizer#set_annotations
|
||||
"""
|
||||
allow_overlap = self.cfg["allow_overlap"]
|
||||
labels = self.labels
|
||||
indices, scores = indices_scores
|
||||
offset = 0
|
||||
for i, doc in enumerate(docs):
|
||||
indices_i = indices[i].dataXd
|
||||
doc.spans[self.key] = self._make_span_group(
|
||||
doc,
|
||||
indices_i,
|
||||
scores[offset : offset + indices.lengths[i]],
|
||||
labels,
|
||||
allow_overlap,
|
||||
) # type: ignore[arg-type]
|
||||
offset += indices.lengths[i]
|
||||
|
||||
def update(
|
||||
self,
|
||||
examples: Iterable[Example],
|
||||
*,
|
||||
drop: float = 0.0,
|
||||
sgd: Optional[Optimizer] = None,
|
||||
losses: Optional[Dict[str, float]] = None,
|
||||
) -> Dict[str, float]:
|
||||
"""Learn from a batch of documents and gold-standard information,
|
||||
updating the pipe's model. Delegates to predict and get_loss.
|
||||
examples (Iterable[Example]): A batch of Example objects.
|
||||
drop (float): The dropout rate.
|
||||
sgd (thinc.api.Optimizer): The optimizer.
|
||||
losses (Dict[str, float]): Optional record of the loss during training.
|
||||
Updated using the component name as the key.
|
||||
RETURNS (Dict[str, float]): The updated losses dictionary.
|
||||
DOCS: https://spacy.io/api/spancategorizer#update
|
||||
"""
|
||||
if losses is None:
|
||||
losses = {}
|
||||
losses.setdefault(self.name, 0.0)
|
||||
validate_examples(examples, "SpanCategorizer.update")
|
||||
self._validate_categories(examples)
|
||||
if not any(len(eg.predicted) if eg.predicted else 0 for eg in examples):
|
||||
# Handle cases where there are no tokens in any docs.
|
||||
return losses
|
||||
docs = [eg.predicted for eg in examples]
|
||||
spans = self.suggester(docs, ops=self.model.ops)
|
||||
if spans.lengths.sum() == 0:
|
||||
return losses
|
||||
set_dropout_rate(self.model, drop)
|
||||
scores, backprop_scores = self.model.begin_update((docs, spans))
|
||||
loss, d_scores = self.get_loss(examples, (spans, scores))
|
||||
backprop_scores(d_scores) # type: ignore
|
||||
if sgd is not None:
|
||||
self.finish_update(sgd)
|
||||
losses[self.name] += loss
|
||||
return losses
|
||||
|
||||
def get_loss(
|
||||
self, examples: Iterable[Example], spans_scores: Tuple[Ragged, Floats2d]
|
||||
) -> Tuple[float, float]:
|
||||
"""Find the loss and gradient of loss for the batch of documents and
|
||||
their predicted scores.
|
||||
examples (Iterable[Examples]): The batch of examples.
|
||||
spans_scores: Scores representing the model's predictions.
|
||||
RETURNS (Tuple[float, float]): The loss and the gradient.
|
||||
DOCS: https://spacy.io/api/spancategorizer#get_loss
|
||||
"""
|
||||
spans, scores = spans_scores
|
||||
spans = Ragged(
|
||||
self.model.ops.to_numpy(spans.data), self.model.ops.to_numpy(spans.lengths)
|
||||
)
|
||||
label_map = {label: i for i, label in enumerate(self.labels)}
|
||||
target = numpy.zeros(scores.shape, dtype=scores.dtype)
|
||||
# Set negative class as target initially for all samples.
|
||||
negative_spans = numpy.ones((scores.shape[0]))
|
||||
offset = 0
|
||||
for i, eg in enumerate(examples):
|
||||
# Map (start, end) offset of spans to the row in the
|
||||
# d_scores array, so that we can adjust the gradient
|
||||
# for predictions that were in the gold standard.
|
||||
spans_index = {}
|
||||
spans_i = spans[i].dataXd
|
||||
for j in range(spans.lengths[i]):
|
||||
start = int(spans_i[j, 0]) # type: ignore
|
||||
end = int(spans_i[j, 1]) # type: ignore
|
||||
spans_index[(start, end)] = offset + j
|
||||
for gold_span in self._get_aligned_spans(eg):
|
||||
key = (gold_span.start, gold_span.end)
|
||||
if key in spans_index:
|
||||
row = spans_index[key]
|
||||
k = label_map[gold_span.label_]
|
||||
target[row, k] = 1.0
|
||||
# delete negative label target.
|
||||
negative_spans[row] = 0.0
|
||||
# The target is a flat array for all docs. Track the position
|
||||
# we're at within the flat array.
|
||||
offset += spans.lengths[i]
|
||||
target = self.model.ops.asarray(target, dtype="f") # type: ignore
|
||||
negative_samples = numpy.nonzero(negative_spans)[0]
|
||||
target[negative_samples, self._negative_label] = 1.0
|
||||
d_scores = scores - target
|
||||
neg_weight = self.cfg["negative_weight"]
|
||||
if neg_weight != 1.0:
|
||||
d_scores[negative_samples] *= neg_weight
|
||||
loss = float((d_scores**2).sum())
|
||||
return loss, d_scores
|
||||
|
||||
def initialize(
|
||||
self,
|
||||
get_examples: Callable[[], Iterable[Example]],
|
||||
*,
|
||||
nlp: Optional[Language] = None,
|
||||
labels: Optional[List[str]] = None,
|
||||
) -> None:
|
||||
"""Initialize the pipe for training, using a representative set
|
||||
of data examples.
|
||||
get_examples (Callable[[], Iterable[Example]]): Function that
|
||||
returns a representative sample of gold-standard Example objects.
|
||||
nlp (Optional[Language]): The current nlp object the component is part of.
|
||||
labels (Optional[List[str]]): The labels to add to the component, typically generated by the
|
||||
`init labels` command. If no labels are provided, the get_examples
|
||||
callback is used to extract the labels from the data.
|
||||
DOCS: https://spacy.io/api/spancategorizer#initialize
|
||||
"""
|
||||
subbatch: List[Example] = []
|
||||
if labels is not None:
|
||||
for label in labels:
|
||||
self.add_label(label)
|
||||
for eg in get_examples():
|
||||
if labels is None:
|
||||
for span in eg.reference.spans.get(self.key, []):
|
||||
self.add_label(span.label_)
|
||||
if len(subbatch) < 10:
|
||||
subbatch.append(eg)
|
||||
self._require_labels()
|
||||
if subbatch:
|
||||
docs = [eg.x for eg in subbatch]
|
||||
spans = build_ngram_suggester(sizes=[1])(docs)
|
||||
# + 1 for the "no-label" category
|
||||
Y = self.model.ops.alloc2f(spans.dataXd.shape[0], self._n_labels)
|
||||
self.model.initialize(X=(docs, spans), Y=Y)
|
||||
# FIXME I think this branch is broken
|
||||
else:
|
||||
raise ValueError("Cannot initialize without examples.")
|
||||
|
||||
def _validate_categories(self, examples: Iterable[Example]):
|
||||
# TODO
|
||||
pass
|
||||
|
||||
def _get_aligned_spans(self, eg: Example):
|
||||
return eg.get_aligned_spans_y2x(
|
||||
eg.reference.spans.get(self.key, []), allow_overlap=True
|
||||
)
|
||||
|
||||
def _make_span_group(
|
||||
self,
|
||||
doc: Doc,
|
||||
indices: Ints2d,
|
||||
scores: Floats2d,
|
||||
labels: List[str],
|
||||
allow_overlap: bool = True,
|
||||
) -> SpanGroup:
|
||||
scores = self.model.ops.to_numpy(scores)
|
||||
indices = self.model.ops.to_numpy(indices)
|
||||
predicted = scores.argmax(axis=1)
|
||||
# Remove samples where the negative label is the argmax
|
||||
positive = numpy.where(predicted != self._negative_label)
|
||||
predicted = predicted[positive[0]]
|
||||
indices = indices[positive[0]]
|
||||
# Sort spans according to argmax probability
|
||||
if not allow_overlap:
|
||||
argmax_probs = numpy.take_along_axis(
|
||||
scores[positive[0]], numpy.expand_dims(predicted, 1), axis=1
|
||||
)
|
||||
argmax_probs = argmax_probs.squeeze()
|
||||
sort_idx = (argmax_probs * -1).argsort()
|
||||
predicted = predicted[sort_idx]
|
||||
indices = indices[sort_idx]
|
||||
seen = Ranges()
|
||||
spans = SpanGroup(doc, name=self.key)
|
||||
for i in range(len(predicted)):
|
||||
label = predicted[i]
|
||||
start = indices[i, 0]
|
||||
end = indices[i, 1]
|
||||
if not allow_overlap:
|
||||
if (start, end) in seen:
|
||||
continue
|
||||
else:
|
||||
seen.add(start, end)
|
||||
spans.append(Span(doc, start, end, label=labels[label]))
|
||||
return spans
|
||||
|
|
Loading…
Reference in New Issue
Block a user