merge multilabel and singlelabel spancat

This commit is contained in:
kadarakos 2023-01-31 16:04:11 +00:00
parent 330a452f5e
commit 3f6fd410cc

View File

@ -1,4 +1,5 @@
from typing import List, Dict, Callable, Tuple, Optional, Iterable, Any from typing import List, Dict, Callable, Tuple, Optional, Iterable, Any, cast
from dataclasses import dataclass
from thinc.api import Config, Model, get_current_ops, set_dropout_rate, Ops from thinc.api import Config, Model, get_current_ops, set_dropout_rate, Ops
from thinc.api import Optimizer from thinc.api import Optimizer
from thinc.types import Ragged, Ints2d, Floats2d from thinc.types import Ragged, Ints2d, Floats2d
@ -43,7 +44,34 @@ maxout_pieces = 3
depth = 4 depth = 4
""" """
spancat_singlelabel_default_config = """
[model]
@architectures = "spacy.SpanCategorizer.v1"
scorer = {"@layers": "Softmax.v2"}
[model.reducer]
@layers = spacy.mean_max_reducer.v1
hidden_size = 128
[model.tok2vec]
@architectures = "spacy.Tok2Vec.v2"
[model.tok2vec.embed]
@architectures = "spacy.MultiHashEmbed.v1"
width = 96
rows = [5000, 1000, 2500, 1000]
attrs = ["NORM", "PREFIX", "SUFFIX", "SHAPE"]
include_static_vectors = false
[model.tok2vec.encode]
@architectures = "spacy.MaxoutWindowEncoder.v2"
width = ${model.tok2vec.embed.width}
window_size = 1
maxout_pieces = 3
depth = 4
"""
DEFAULT_SPANCAT_MODEL = Config().from_str(spancat_default_config)["model"] DEFAULT_SPANCAT_MODEL = Config().from_str(spancat_default_config)["model"]
DEFAULT_SPANCAT_SINGLELABEL_MODEL = Config().from_str(spancat_singlelabel_default_config)["model"]
@runtime_checkable @runtime_checkable
@ -154,6 +182,64 @@ def make_spancat(
) )
@Language.factory(
"spancat_singlelabel",
assigns=["doc.spans"],
default_config={
"spans_key": "sc",
"model": DEFAULT_SPANCAT_SINGLELABEL_MODEL,
"negative_weight": 1.0,
"suggester": {"@misc": "spacy.ngram_suggester.v1", "sizes": [1, 2, 3]},
"scorer": {"@scorers": "spacy.spancat_scorer.v1"},
"allow_overlap": True
},
default_score_weights={"spans_sc_f": 1.0, "spans_sc_p": 0.0, "spans_sc_r": 0.0},
)
def make_spancat_singlelabel(
nlp: Language,
name: str,
suggester: Suggester,
model: Model[Tuple[List[Doc], Ragged], Floats2d],
spans_key: str,
negative_weight: float,
allow_overlap: bool,
scorer: Optional[Callable],
) -> "SpanCategorizer":
"""Create a SpanCategorizer component. The span categorizer consists of two
parts: a suggester function that proposes candidate spans, and a labeller
model that predicts one or more labels for each span.
suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
Spans are returned as a ragged array with two integer columns, for the
start and end positions.
model (Model[Tuple[List[Doc], Ragged], Floats2d]): A model instance that
is given a list of documents and (start, end) indices representing
candidate span offsets. The model predicts a probability for each category
for each span.
spans_key (str): Key of the doc.spans dict to save the spans under. During
initialization and training, the component will look for spans on the
reference document under the same key.
scorer (Optional[Callable]): The scoring method. Defaults to
Scorer.score_spans for the Doc.spans[spans_key] with overlapping
spans allowed.
threshold (float): Minimum probability to consider a prediction positive.
Spans with a positive prediction will be saved on the Doc. Defaults to
0.5.
max_positive (Optional[int]): Maximum number of labels to consider positive
per span. Defaults to None, indicating no limit.
"""
return SpanCategorizer(
nlp.vocab,
suggester=suggester,
model=model,
spans_key=spans_key,
negative_weight=negative_weight,
allow_overlap=allow_overlap,
name=name,
scorer=scorer,
)
def spancat_score(examples: Iterable[Example], **kwargs) -> Dict[str, Any]: def spancat_score(examples: Iterable[Example], **kwargs) -> Dict[str, Any]:
kwargs = dict(kwargs) kwargs = dict(kwargs)
attr_prefix = "spans_" attr_prefix = "spans_"
@ -172,6 +258,27 @@ def make_spancat_scorer():
return spancat_score return spancat_score
@dataclass
class Ranges:
"""
Helper class to avoid storing overlapping spans.
"""
def __init__(self):
self.ranges = set()
def add(self, i, j):
for e in range(i, j):
self.ranges.add(e)
def __contains__(self, rang):
i, j = rang
for e in range(i, j):
if e in self.ranges:
return True
return False
class SpanCategorizer(TrainablePipe): class SpanCategorizer(TrainablePipe):
"""Pipeline component to label spans of text. """Pipeline component to label spans of text.
@ -183,11 +290,16 @@ class SpanCategorizer(TrainablePipe):
vocab: Vocab, vocab: Vocab,
model: Model[Tuple[List[Doc], Ragged], Floats2d], model: Model[Tuple[List[Doc], Ragged], Floats2d],
suggester: Suggester, suggester: Suggester,
# XXX Not sure what's the best default name when it can both be spancat
# and spancat_singlelabel
name: str = "spancat", name: str = "spancat",
*, *,
single_label: bool = False,
spans_key: str = "spans", spans_key: str = "spans",
threshold: float = 0.5, negative_weight: Optional[float] = None,
allow_overlap: Optional[bool] = None,
max_positive: Optional[int] = None, max_positive: Optional[int] = None,
threshold: Optional[float] = None,
scorer: Optional[Callable] = spancat_score, scorer: Optional[Callable] = spancat_score,
) -> None: ) -> None:
"""Initialize the span categorizer. """Initialize the span categorizer.
@ -218,12 +330,15 @@ class SpanCategorizer(TrainablePipe):
"spans_key": spans_key, "spans_key": spans_key,
"threshold": threshold, "threshold": threshold,
"max_positive": max_positive, "max_positive": max_positive,
"negative_weight": negative_weight,
"allow_overlap": allow_overlap
} }
self.vocab = vocab self.vocab = vocab
self.suggester = suggester self.suggester = suggester
self.model = model self.model = model
self.name = name self.name = name
self.scorer = scorer self.scorer = scorer
self.single_label = single_label
@property @property
def key(self) -> str: def key(self) -> str:
@ -281,11 +396,27 @@ class SpanCategorizer(TrainablePipe):
""" """
return list(self.labels) return list(self.labels)
@property
def label_map(self) -> Dict[str, int]:
"""RETURNS (Dict[str, int]): The label map."""
return {label: i for i, label in enumerate(self.labels)}
@property @property
def _n_labels(self) -> int: def _n_labels(self) -> int:
"""RETURNS (int): Number of labels.""" """RETURNS (int): Number of labels."""
if self.single_label:
return len(self.labels) + 1
else:
return len(self.labels) return len(self.labels)
@property
def _negative_label(self) -> int:
"""RETURNS (int): Index of the negative label."""
if self.single_label:
return -1
else:
return len(self.label_data)
def predict(self, docs: Iterable[Doc]): def predict(self, docs: Iterable[Doc]):
"""Apply the pipeline's model to a batch of docs, without modifying them. """Apply the pipeline's model to a batch of docs, without modifying them.
@ -394,8 +525,9 @@ class SpanCategorizer(TrainablePipe):
spans = Ragged( spans = Ragged(
self.model.ops.to_numpy(spans.data), self.model.ops.to_numpy(spans.lengths) self.model.ops.to_numpy(spans.data), self.model.ops.to_numpy(spans.lengths)
) )
label_map = {label: i for i, label in enumerate(self.labels)}
target = numpy.zeros(scores.shape, dtype=scores.dtype) target = numpy.zeros(scores.shape, dtype=scores.dtype)
if self.single_label:
negative_spans = numpy.ones((scores.shape[0]))
offset = 0 offset = 0
for i, eg in enumerate(examples): for i, eg in enumerate(examples):
# Map (start, end) offset of spans to the row in the d_scores array, # Map (start, end) offset of spans to the row in the d_scores array,
@ -411,12 +543,18 @@ class SpanCategorizer(TrainablePipe):
key = (gold_span.start, gold_span.end) key = (gold_span.start, gold_span.end)
if key in spans_index: if key in spans_index:
row = spans_index[key] row = spans_index[key]
k = label_map[gold_span.label_] k = self.label_map[gold_span.label_]
target[row, k] = 1.0 target[row, k] = 1.0
if self.single_label:
# delete negative label target.
negative_spans[row] = 0.0
# The target is a flat array for all docs. Track the position # The target is a flat array for all docs. Track the position
# we're at within the flat array. # we're at within the flat array.
offset += spans.lengths[i] offset += spans.lengths[i]
target = self.model.ops.asarray(target, dtype="f") # type: ignore target = self.model.ops.asarray(target, dtype="f") # type: ignore
if self.single_label:
negative_samples = numpy.nonzero(negative_spans)[0]
target[negative_samples, self._negative_label] = 1.0 # type: ignore
# The target will have the values 0 (for untrue predictions) or 1 # The target will have the values 0 (for untrue predictions) or 1
# (for true predictions). # (for true predictions).
# The scores should be in the range [0, 1]. # The scores should be in the range [0, 1].
@ -425,6 +563,10 @@ class SpanCategorizer(TrainablePipe):
# If the prediction is 0.9 and it's false, the gradient will be # If the prediction is 0.9 and it's false, the gradient will be
# 0.9 (0.9 - 0.0) # 0.9 (0.9 - 0.0)
d_scores = scores - target d_scores = scores - target
if self.single_label:
neg_weight = cast(float, self.cfg["negative_weight"])
if neg_weight != 1.0:
d_scores[negative_samples] *= neg_weight
loss = float((d_scores**2).sum()) loss = float((d_scores**2).sum())
return loss, d_scores return loss, d_scores
@ -475,8 +617,13 @@ class SpanCategorizer(TrainablePipe):
eg.reference.spans.get(self.key, []), allow_overlap=True eg.reference.spans.get(self.key, []), allow_overlap=True
) )
def _make_span_group( def _make_span_group_multilabel(
self, doc: Doc, indices: Ints2d, scores: Floats2d, labels: List[str] self,
doc: Doc,
indices: Ints2d,
scores: Floats2d,
labels: List[str],
allow_overlap: bool = True
) -> SpanGroup: ) -> SpanGroup:
spans = SpanGroup(doc, name=self.key) spans = SpanGroup(doc, name=self.key)
max_positive = self.cfg["max_positive"] max_positive = self.cfg["max_positive"]
@ -489,6 +636,7 @@ class SpanCategorizer(TrainablePipe):
span_filter = ranked[:, max_positive:] span_filter = ranked[:, max_positive:]
for i, row in enumerate(span_filter): for i, row in enumerate(span_filter):
keeps[i, row] = False keeps[i, row] = False
spans.attrs["scores"] = scores[keeps].flatten() spans.attrs["scores"] = scores[keeps].flatten()
indices = self.model.ops.to_numpy(indices) indices = self.model.ops.to_numpy(indices)
@ -503,3 +651,51 @@ class SpanCategorizer(TrainablePipe):
spans.append(Span(doc, start, end, label=labels[j])) spans.append(Span(doc, start, end, label=labels[j]))
return spans return spans
def _make_span_group_singlelabel(
self,
doc: Doc,
indices: Ints2d,
scores: Floats2d,
labels: List[str],
allow_overlap: bool = True,
) -> SpanGroup:
scores = self.model.ops.to_numpy(scores)
indices = self.model.ops.to_numpy(indices)
# Handle cases when there are zero suggestions
if scores.size == 0:
return SpanGroup(doc, name=self.key)
predicted = scores.argmax(axis=1)
# Remove samples where the negative label is the argmax
positive = numpy.where(predicted != self._negative_label)[0]
predicted = predicted[positive]
indices = indices[positive]
# Sort spans according to argmax probability
if not allow_overlap and predicted.size != 0:
# Get the probabilities
argmax_probs = numpy.take_along_axis(
scores[positive], numpy.expand_dims(predicted, 1), axis=1
)
argmax_probs = argmax_probs.squeeze()
sort_idx = (argmax_probs * -1).argsort()
predicted = predicted[sort_idx]
indices = indices[sort_idx]
seen = Ranges()
spans = SpanGroup(doc, name=self.key)
for i in range(len(predicted)):
label = predicted[i]
start = indices[i, 0]
end = indices[i, 1]
if not allow_overlap:
if (start, end) in seen:
continue
else:
seen.add(start, end)
spans.append(Span(doc, start, end, label=labels[label]))
return spans