From 3f6fd410cc43d9fa63b0ab2f9ff13b2a06be9718 Mon Sep 17 00:00:00 2001 From: kadarakos Date: Tue, 31 Jan 2023 16:04:11 +0000 Subject: [PATCH] merge multilabel and singlelabel spancat --- spacy/pipeline/spancat.py | 210 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 203 insertions(+), 7 deletions(-) diff --git a/spacy/pipeline/spancat.py b/spacy/pipeline/spancat.py index fba618b92..28a527104 100644 --- a/spacy/pipeline/spancat.py +++ b/spacy/pipeline/spancat.py @@ -1,4 +1,5 @@ -from typing import List, Dict, Callable, Tuple, Optional, Iterable, Any +from typing import List, Dict, Callable, Tuple, Optional, Iterable, Any, cast +from dataclasses import dataclass from thinc.api import Config, Model, get_current_ops, set_dropout_rate, Ops from thinc.api import Optimizer from thinc.types import Ragged, Ints2d, Floats2d @@ -43,7 +44,34 @@ maxout_pieces = 3 depth = 4 """ +spancat_singlelabel_default_config = """ +[model] +@architectures = "spacy.SpanCategorizer.v1" +scorer = {"@layers": "Softmax.v2"} + +[model.reducer] +@layers = spacy.mean_max_reducer.v1 +hidden_size = 128 + +[model.tok2vec] +@architectures = "spacy.Tok2Vec.v2" +[model.tok2vec.embed] +@architectures = "spacy.MultiHashEmbed.v1" +width = 96 +rows = [5000, 1000, 2500, 1000] +attrs = ["NORM", "PREFIX", "SUFFIX", "SHAPE"] +include_static_vectors = false + +[model.tok2vec.encode] +@architectures = "spacy.MaxoutWindowEncoder.v2" +width = ${model.tok2vec.embed.width} +window_size = 1 +maxout_pieces = 3 +depth = 4 +""" + DEFAULT_SPANCAT_MODEL = Config().from_str(spancat_default_config)["model"] +DEFAULT_SPANCAT_SINGLELABEL_MODEL = Config().from_str(spancat_singlelabel_default_config)["model"] @runtime_checkable @@ -154,6 +182,64 @@ def make_spancat( ) +@Language.factory( + "spancat_singlelabel", + assigns=["doc.spans"], + default_config={ + "spans_key": "sc", + "model": DEFAULT_SPANCAT_SINGLELABEL_MODEL, + "negative_weight": 1.0, + "suggester": {"@misc": "spacy.ngram_suggester.v1", "sizes": [1, 2, 3]}, + "scorer": {"@scorers": "spacy.spancat_scorer.v1"}, + "allow_overlap": True + }, + default_score_weights={"spans_sc_f": 1.0, "spans_sc_p": 0.0, "spans_sc_r": 0.0}, +) +def make_spancat_singlelabel( + nlp: Language, + name: str, + suggester: Suggester, + model: Model[Tuple[List[Doc], Ragged], Floats2d], + spans_key: str, + negative_weight: float, + allow_overlap: bool, + scorer: Optional[Callable], +) -> "SpanCategorizer": + """Create a SpanCategorizer component. The span categorizer consists of two + parts: a suggester function that proposes candidate spans, and a labeller + model that predicts one or more labels for each span. + + suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans. + Spans are returned as a ragged array with two integer columns, for the + start and end positions. + model (Model[Tuple[List[Doc], Ragged], Floats2d]): A model instance that + is given a list of documents and (start, end) indices representing + candidate span offsets. The model predicts a probability for each category + for each span. + spans_key (str): Key of the doc.spans dict to save the spans under. During + initialization and training, the component will look for spans on the + reference document under the same key. + scorer (Optional[Callable]): The scoring method. Defaults to + Scorer.score_spans for the Doc.spans[spans_key] with overlapping + spans allowed. + threshold (float): Minimum probability to consider a prediction positive. + Spans with a positive prediction will be saved on the Doc. Defaults to + 0.5. + max_positive (Optional[int]): Maximum number of labels to consider positive + per span. Defaults to None, indicating no limit. + """ + return SpanCategorizer( + nlp.vocab, + suggester=suggester, + model=model, + spans_key=spans_key, + negative_weight=negative_weight, + allow_overlap=allow_overlap, + name=name, + scorer=scorer, + ) + + def spancat_score(examples: Iterable[Example], **kwargs) -> Dict[str, Any]: kwargs = dict(kwargs) attr_prefix = "spans_" @@ -172,6 +258,27 @@ def make_spancat_scorer(): return spancat_score +@dataclass +class Ranges: + """ + Helper class to avoid storing overlapping spans. + """ + + def __init__(self): + self.ranges = set() + + def add(self, i, j): + for e in range(i, j): + self.ranges.add(e) + + def __contains__(self, rang): + i, j = rang + for e in range(i, j): + if e in self.ranges: + return True + return False + + class SpanCategorizer(TrainablePipe): """Pipeline component to label spans of text. @@ -183,11 +290,16 @@ class SpanCategorizer(TrainablePipe): vocab: Vocab, model: Model[Tuple[List[Doc], Ragged], Floats2d], suggester: Suggester, + # XXX Not sure what's the best default name when it can both be spancat + # and spancat_singlelabel name: str = "spancat", *, + single_label: bool = False, spans_key: str = "spans", - threshold: float = 0.5, + negative_weight: Optional[float] = None, + allow_overlap: Optional[bool] = None, max_positive: Optional[int] = None, + threshold: Optional[float] = None, scorer: Optional[Callable] = spancat_score, ) -> None: """Initialize the span categorizer. @@ -218,12 +330,15 @@ class SpanCategorizer(TrainablePipe): "spans_key": spans_key, "threshold": threshold, "max_positive": max_positive, + "negative_weight": negative_weight, + "allow_overlap": allow_overlap } self.vocab = vocab self.suggester = suggester self.model = model self.name = name self.scorer = scorer + self.single_label = single_label @property def key(self) -> str: @@ -281,10 +396,26 @@ class SpanCategorizer(TrainablePipe): """ return list(self.labels) + @property + def label_map(self) -> Dict[str, int]: + """RETURNS (Dict[str, int]): The label map.""" + return {label: i for i, label in enumerate(self.labels)} + @property def _n_labels(self) -> int: """RETURNS (int): Number of labels.""" - return len(self.labels) + if self.single_label: + return len(self.labels) + 1 + else: + return len(self.labels) + + @property + def _negative_label(self) -> int: + """RETURNS (int): Index of the negative label.""" + if self.single_label: + return -1 + else: + return len(self.label_data) def predict(self, docs: Iterable[Doc]): """Apply the pipeline's model to a batch of docs, without modifying them. @@ -394,8 +525,9 @@ class SpanCategorizer(TrainablePipe): spans = Ragged( self.model.ops.to_numpy(spans.data), self.model.ops.to_numpy(spans.lengths) ) - label_map = {label: i for i, label in enumerate(self.labels)} target = numpy.zeros(scores.shape, dtype=scores.dtype) + if self.single_label: + negative_spans = numpy.ones((scores.shape[0])) offset = 0 for i, eg in enumerate(examples): # Map (start, end) offset of spans to the row in the d_scores array, @@ -411,12 +543,18 @@ class SpanCategorizer(TrainablePipe): key = (gold_span.start, gold_span.end) if key in spans_index: row = spans_index[key] - k = label_map[gold_span.label_] + k = self.label_map[gold_span.label_] target[row, k] = 1.0 + if self.single_label: + # delete negative label target. + negative_spans[row] = 0.0 # The target is a flat array for all docs. Track the position # we're at within the flat array. offset += spans.lengths[i] target = self.model.ops.asarray(target, dtype="f") # type: ignore + if self.single_label: + negative_samples = numpy.nonzero(negative_spans)[0] + target[negative_samples, self._negative_label] = 1.0 # type: ignore # The target will have the values 0 (for untrue predictions) or 1 # (for true predictions). # The scores should be in the range [0, 1]. @@ -425,6 +563,10 @@ class SpanCategorizer(TrainablePipe): # If the prediction is 0.9 and it's false, the gradient will be # 0.9 (0.9 - 0.0) d_scores = scores - target + if self.single_label: + neg_weight = cast(float, self.cfg["negative_weight"]) + if neg_weight != 1.0: + d_scores[negative_samples] *= neg_weight loss = float((d_scores**2).sum()) return loss, d_scores @@ -475,8 +617,13 @@ class SpanCategorizer(TrainablePipe): eg.reference.spans.get(self.key, []), allow_overlap=True ) - def _make_span_group( - self, doc: Doc, indices: Ints2d, scores: Floats2d, labels: List[str] + def _make_span_group_multilabel( + self, + doc: Doc, + indices: Ints2d, + scores: Floats2d, + labels: List[str], + allow_overlap: bool = True ) -> SpanGroup: spans = SpanGroup(doc, name=self.key) max_positive = self.cfg["max_positive"] @@ -489,6 +636,7 @@ class SpanCategorizer(TrainablePipe): span_filter = ranked[:, max_positive:] for i, row in enumerate(span_filter): keeps[i, row] = False + spans.attrs["scores"] = scores[keeps].flatten() indices = self.model.ops.to_numpy(indices) @@ -503,3 +651,51 @@ class SpanCategorizer(TrainablePipe): spans.append(Span(doc, start, end, label=labels[j])) return spans + + def _make_span_group_singlelabel( + self, + doc: Doc, + indices: Ints2d, + scores: Floats2d, + labels: List[str], + allow_overlap: bool = True, + ) -> SpanGroup: + scores = self.model.ops.to_numpy(scores) + indices = self.model.ops.to_numpy(indices) + # Handle cases when there are zero suggestions + if scores.size == 0: + return SpanGroup(doc, name=self.key) + + predicted = scores.argmax(axis=1) + # Remove samples where the negative label is the argmax + positive = numpy.where(predicted != self._negative_label)[0] + predicted = predicted[positive] + indices = indices[positive] + + # Sort spans according to argmax probability + if not allow_overlap and predicted.size != 0: + # Get the probabilities + argmax_probs = numpy.take_along_axis( + scores[positive], numpy.expand_dims(predicted, 1), axis=1 + ) + argmax_probs = argmax_probs.squeeze() + sort_idx = (argmax_probs * -1).argsort() + predicted = predicted[sort_idx] + indices = indices[sort_idx] + + seen = Ranges() + spans = SpanGroup(doc, name=self.key) + for i in range(len(predicted)): + label = predicted[i] + start = indices[i, 0] + end = indices[i, 1] + + if not allow_overlap: + if (start, end) in seen: + continue + else: + seen.add(start, end) + + spans.append(Span(doc, start, end, label=labels[label])) + + return spans