mirror of
https://github.com/explosion/spaCy.git
synced 2025-11-11 21:35:47 +03:00
In order to support Python 3.13, we had to migrate to Cython 3.0. This caused some tricky interaction with our Pydantic usage, because Cython 3 uses the from __future__ import annotations semantics, which causes type annotations to be saved as strings. The end result is that we can't have Language.factory decorated functions in Cython modules anymore, as the Language.factory decorator expects to inspect the signature of the functions and build a Pydantic model. If the function is implemented in Cython, an error is raised because the type is not resolved. To address this I've moved the factory functions into a new module, spacy.pipeline.factories. I've added __getattr__ importlib hooks to the previous locations, in case anyone was importing these functions directly. The change should have no backwards compatibility implications. Along the way I've also refactored the registration of functions for the config. Previously these ran as import-time side-effects, using the registry decorator. I've created instead a new module spacy.registrations. When the registry is accessed it calls a function ensure_populated(), which cases the registrations to occur. I've made a similar change to the Language.factory registrations in the new spacy.pipeline.factories module. I want to remove these import-time side-effects so that we can speed up the loading time of the library, which can be especially painful on the CLI. I also find that I'm often working to track down the implementations of functions referenced by strings in the config. Having the registrations all happen in one place will make this easier. With these changes I've fortunately avoided the need to migrate to Pydantic v2 properly --- we're still using the v1 compatibility shim. We might not be able to hold out forever though: Pydantic (reasonably) aren't actively supporting the v1 shims. I put a lot of work into v2 migration when investigating the 3.13 support, and it's definitely challenging. In any case, it's a relief that we don't have to do the v2 migration at the same time as the Cython 3.0/Python 3.13 support.
509 lines
18 KiB
Python
509 lines
18 KiB
Python
import importlib
|
|
import sys
|
|
import warnings
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
Iterable,
|
|
List,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
Tuple,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
import srsly
|
|
|
|
from .. import util
|
|
from ..errors import Errors, Warnings
|
|
from ..language import Language
|
|
from ..matcher import Matcher, PhraseMatcher
|
|
from ..matcher.levenshtein import levenshtein_compare
|
|
from ..scorer import Scorer
|
|
from ..tokens import Doc, Span
|
|
from ..training import Example
|
|
from ..util import SimpleFrozenList, ensure_path, registry
|
|
from .pipe import Pipe
|
|
|
|
PatternType = Dict[str, Union[str, List[Dict[str, Any]]]]
|
|
DEFAULT_SPANS_KEY = "ruler"
|
|
|
|
|
|
def prioritize_new_ents_filter(
|
|
entities: Iterable[Span], spans: Iterable[Span]
|
|
) -> List[Span]:
|
|
"""Merge entities and spans into one list without overlaps by allowing
|
|
spans to overwrite any entities that they overlap with. Intended to
|
|
replicate the overwrite_ents=True behavior from the EntityRuler.
|
|
|
|
entities (Iterable[Span]): The entities, already filtered for overlaps.
|
|
spans (Iterable[Span]): The spans to merge, may contain overlaps.
|
|
RETURNS (List[Span]): Filtered list of non-overlapping spans.
|
|
"""
|
|
get_sort_key = lambda span: (span.end - span.start, -span.start)
|
|
spans = sorted(spans, key=get_sort_key, reverse=True)
|
|
entities = list(entities)
|
|
new_entities = []
|
|
seen_tokens: Set[int] = set()
|
|
for span in spans:
|
|
start = span.start
|
|
end = span.end
|
|
if all(token.i not in seen_tokens for token in span):
|
|
new_entities.append(span)
|
|
entities = [e for e in entities if not (e.start < end and e.end > start)]
|
|
seen_tokens.update(range(start, end))
|
|
return entities + new_entities
|
|
|
|
|
|
def make_prioritize_new_ents_filter():
|
|
return prioritize_new_ents_filter
|
|
|
|
|
|
def prioritize_existing_ents_filter(
|
|
entities: Iterable[Span], spans: Iterable[Span]
|
|
) -> List[Span]:
|
|
"""Merge entities and spans into one list without overlaps by prioritizing
|
|
existing entities. Intended to replicate the overwrite_ents=False behavior
|
|
from the EntityRuler.
|
|
|
|
entities (Iterable[Span]): The entities, already filtered for overlaps.
|
|
spans (Iterable[Span]): The spans to merge, may contain overlaps.
|
|
RETURNS (List[Span]): Filtered list of non-overlapping spans.
|
|
"""
|
|
get_sort_key = lambda span: (span.end - span.start, -span.start)
|
|
spans = sorted(spans, key=get_sort_key, reverse=True)
|
|
entities = list(entities)
|
|
new_entities = []
|
|
seen_tokens: Set[int] = set()
|
|
seen_tokens.update(*(range(ent.start, ent.end) for ent in entities))
|
|
for span in spans:
|
|
start = span.start
|
|
end = span.end
|
|
if all(token.i not in seen_tokens for token in span):
|
|
new_entities.append(span)
|
|
seen_tokens.update(range(start, end))
|
|
return entities + new_entities
|
|
|
|
|
|
def make_preserve_existing_ents_filter():
|
|
return prioritize_existing_ents_filter
|
|
|
|
|
|
def overlapping_labeled_spans_score(
|
|
examples: Iterable[Example], *, spans_key=DEFAULT_SPANS_KEY, **kwargs
|
|
) -> Dict[str, Any]:
|
|
kwargs = dict(kwargs)
|
|
attr_prefix = f"spans_"
|
|
kwargs.setdefault("attr", f"{attr_prefix}{spans_key}")
|
|
kwargs.setdefault("allow_overlap", True)
|
|
kwargs.setdefault("labeled", True)
|
|
kwargs.setdefault(
|
|
"getter", lambda doc, key: doc.spans.get(key[len(attr_prefix) :], [])
|
|
)
|
|
kwargs.setdefault("has_annotation", lambda doc: spans_key in doc.spans)
|
|
return Scorer.score_spans(examples, **kwargs)
|
|
|
|
|
|
def make_overlapping_labeled_spans_scorer(spans_key: str = DEFAULT_SPANS_KEY):
|
|
return partial(overlapping_labeled_spans_score, spans_key=spans_key)
|
|
|
|
|
|
class SpanRuler(Pipe):
|
|
"""The SpanRuler lets you add spans to the `Doc.spans` using token-based
|
|
rules or exact phrase matches.
|
|
|
|
DOCS: https://spacy.io/api/spanruler
|
|
USAGE: https://spacy.io/usage/rule-based-matching#spanruler
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
nlp: Language,
|
|
name: str = "span_ruler",
|
|
*,
|
|
spans_key: Optional[str] = DEFAULT_SPANS_KEY,
|
|
spans_filter: Optional[
|
|
Callable[[Iterable[Span], Iterable[Span]], Iterable[Span]]
|
|
] = None,
|
|
annotate_ents: bool = False,
|
|
ents_filter: Callable[
|
|
[Iterable[Span], Iterable[Span]], Iterable[Span]
|
|
] = util.filter_chain_spans,
|
|
phrase_matcher_attr: Optional[Union[int, str]] = None,
|
|
matcher_fuzzy_compare: Callable = levenshtein_compare,
|
|
validate: bool = False,
|
|
overwrite: bool = False,
|
|
scorer: Optional[Callable] = partial(
|
|
overlapping_labeled_spans_score, spans_key=DEFAULT_SPANS_KEY
|
|
),
|
|
) -> None:
|
|
"""Initialize the span ruler. If patterns are supplied here, they
|
|
need to be a list of dictionaries with a `"label"` and `"pattern"`
|
|
key. A pattern can either be a token pattern (list) or a phrase pattern
|
|
(string). For example: `{'label': 'ORG', 'pattern': 'Apple'}`.
|
|
|
|
nlp (Language): The shared nlp object to pass the vocab to the matchers
|
|
and process phrase patterns.
|
|
name (str): Instance name of the current pipeline component. Typically
|
|
passed in automatically from the factory when the component is
|
|
added. Used to disable the current span ruler while creating
|
|
phrase patterns with the nlp object.
|
|
spans_key (Optional[str]): The spans key to save the spans under. If
|
|
`None`, no spans are saved. Defaults to "ruler".
|
|
spans_filter (Optional[Callable[[Iterable[Span], Iterable[Span]], List[Span]]):
|
|
The optional method to filter spans before they are assigned to
|
|
doc.spans. Defaults to `None`.
|
|
annotate_ents (bool): Whether to save spans to doc.ents. Defaults to
|
|
`False`.
|
|
ents_filter (Callable[[Iterable[Span], Iterable[Span]], List[Span]]):
|
|
The method to filter spans before they are assigned to doc.ents.
|
|
Defaults to `util.filter_chain_spans`.
|
|
phrase_matcher_attr (Optional[Union[int, str]]): Token attribute to
|
|
match on, passed to the internal PhraseMatcher as `attr`. Defaults
|
|
to `None`.
|
|
matcher_fuzzy_compare (Callable): The fuzzy comparison method for the
|
|
internal Matcher. Defaults to
|
|
spacy.matcher.levenshtein.levenshtein_compare.
|
|
validate (bool): Whether patterns should be validated, passed to
|
|
Matcher and PhraseMatcher as `validate`.
|
|
overwrite (bool): Whether to remove any existing spans under this spans
|
|
key if `spans_key` is set, and/or to remove any ents under `doc.ents` if
|
|
`annotate_ents` is set. Defaults to `True`.
|
|
scorer (Optional[Callable]): The scoring method. Defaults to
|
|
spacy.pipeline.span_ruler.overlapping_labeled_spans_score.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#init
|
|
"""
|
|
self.nlp = nlp
|
|
self.name = name
|
|
self.spans_key = spans_key
|
|
self.annotate_ents = annotate_ents
|
|
self.phrase_matcher_attr = phrase_matcher_attr
|
|
self.validate = validate
|
|
self.overwrite = overwrite
|
|
self.spans_filter = spans_filter
|
|
self.ents_filter = ents_filter
|
|
self.scorer = scorer
|
|
self.matcher_fuzzy_compare = matcher_fuzzy_compare
|
|
self._match_label_id_map: Dict[int, Dict[str, str]] = {}
|
|
self.clear()
|
|
|
|
def __len__(self) -> int:
|
|
"""The number of all labels added to the span ruler."""
|
|
return len(self._patterns)
|
|
|
|
def __contains__(self, label: str) -> bool:
|
|
"""Whether a label is present in the patterns."""
|
|
for label_id in self._match_label_id_map.values():
|
|
if label_id["label"] == label:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def key(self) -> Optional[str]:
|
|
"""Key of the doc.spans dict to save the spans under."""
|
|
return self.spans_key
|
|
|
|
def __call__(self, doc: Doc) -> Doc:
|
|
"""Find matches in document and add them as entities.
|
|
|
|
doc (Doc): The Doc object in the pipeline.
|
|
RETURNS (Doc): The Doc with added entities, if available.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#call
|
|
"""
|
|
error_handler = self.get_error_handler()
|
|
try:
|
|
matches = self.match(doc)
|
|
self.set_annotations(doc, matches)
|
|
return doc
|
|
except Exception as e:
|
|
return error_handler(self.name, self, [doc], e)
|
|
|
|
def match(self, doc: Doc):
|
|
self._require_patterns()
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore", message="\\[W036")
|
|
matches = cast(
|
|
List[Tuple[int, int, int]],
|
|
list(self.matcher(doc)) + list(self.phrase_matcher(doc)),
|
|
)
|
|
deduplicated_matches = set(
|
|
Span(
|
|
doc,
|
|
start,
|
|
end,
|
|
label=self._match_label_id_map[m_id]["label"],
|
|
span_id=self._match_label_id_map[m_id]["id"],
|
|
)
|
|
for m_id, start, end in matches
|
|
if start != end
|
|
)
|
|
return sorted(list(deduplicated_matches))
|
|
|
|
def set_annotations(self, doc, matches):
|
|
"""Modify the document in place"""
|
|
# set doc.spans if spans_key is set
|
|
if self.key:
|
|
spans = []
|
|
if self.key in doc.spans and not self.overwrite:
|
|
spans = doc.spans[self.key]
|
|
spans.extend(
|
|
self.spans_filter(spans, matches) if self.spans_filter else matches
|
|
)
|
|
doc.spans[self.key] = spans
|
|
# set doc.ents if annotate_ents is set
|
|
if self.annotate_ents:
|
|
spans = []
|
|
if not self.overwrite:
|
|
spans = list(doc.ents)
|
|
spans = self.ents_filter(spans, matches)
|
|
try:
|
|
doc.ents = sorted(spans)
|
|
except ValueError:
|
|
raise ValueError(Errors.E854)
|
|
|
|
@property
|
|
def labels(self) -> Tuple[str, ...]:
|
|
"""All labels present in the match patterns.
|
|
|
|
RETURNS (set): The string labels.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#labels
|
|
"""
|
|
return tuple(sorted(set([cast(str, p["label"]) for p in self._patterns])))
|
|
|
|
@property
|
|
def ids(self) -> Tuple[str, ...]:
|
|
"""All IDs present in the match patterns.
|
|
|
|
RETURNS (set): The string IDs.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#ids
|
|
"""
|
|
return tuple(
|
|
sorted(set([cast(str, p.get("id")) for p in self._patterns]) - set([None]))
|
|
)
|
|
|
|
def initialize(
|
|
self,
|
|
get_examples: Callable[[], Iterable[Example]],
|
|
*,
|
|
nlp: Optional[Language] = None,
|
|
patterns: Optional[Sequence[PatternType]] = None,
|
|
):
|
|
"""Initialize the pipe for training.
|
|
|
|
get_examples (Callable[[], Iterable[Example]]): Function that
|
|
returns a representative sample of gold-standard Example objects.
|
|
nlp (Language): The current nlp object the component is part of.
|
|
patterns (Optional[Iterable[PatternType]]): The list of patterns.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#initialize
|
|
"""
|
|
self.clear()
|
|
if patterns:
|
|
self.add_patterns(patterns) # type: ignore[arg-type]
|
|
|
|
@property
|
|
def patterns(self) -> List[PatternType]:
|
|
"""Get all patterns that were added to the span ruler.
|
|
|
|
RETURNS (list): The original patterns, one dictionary per pattern.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#patterns
|
|
"""
|
|
return self._patterns
|
|
|
|
def add_patterns(self, patterns: List[PatternType]) -> None:
|
|
"""Add patterns to the span ruler. A pattern can either be a token
|
|
pattern (list of dicts) or a phrase pattern (string). For example:
|
|
{'label': 'ORG', 'pattern': 'Apple'}
|
|
{'label': 'ORG', 'pattern': 'Apple', 'id': 'apple'}
|
|
{'label': 'GPE', 'pattern': [{'lower': 'san'}, {'lower': 'francisco'}]}
|
|
|
|
patterns (list): The patterns to add.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#add_patterns
|
|
"""
|
|
|
|
# disable the nlp components after this one in case they haven't been
|
|
# initialized / deserialized yet
|
|
try:
|
|
current_index = -1
|
|
for i, (name, pipe) in enumerate(self.nlp.pipeline):
|
|
if self == pipe:
|
|
current_index = i
|
|
break
|
|
subsequent_pipes = [pipe for pipe in self.nlp.pipe_names[current_index:]]
|
|
except ValueError:
|
|
subsequent_pipes = []
|
|
with self.nlp.select_pipes(disable=subsequent_pipes):
|
|
phrase_pattern_labels = []
|
|
phrase_pattern_texts = []
|
|
for entry in patterns:
|
|
p_label = cast(str, entry["label"])
|
|
p_id = cast(str, entry.get("id", ""))
|
|
label = repr((p_label, p_id))
|
|
self._match_label_id_map[self.nlp.vocab.strings.as_int(label)] = {
|
|
"label": p_label,
|
|
"id": p_id,
|
|
}
|
|
if isinstance(entry["pattern"], str):
|
|
phrase_pattern_labels.append(label)
|
|
phrase_pattern_texts.append(entry["pattern"])
|
|
elif isinstance(entry["pattern"], list):
|
|
self.matcher.add(label, [entry["pattern"]])
|
|
else:
|
|
raise ValueError(Errors.E097.format(pattern=entry["pattern"]))
|
|
self._patterns.append(entry)
|
|
for label, pattern in zip(
|
|
phrase_pattern_labels,
|
|
self.nlp.pipe(phrase_pattern_texts),
|
|
):
|
|
self.phrase_matcher.add(label, [pattern])
|
|
|
|
def clear(self) -> None:
|
|
"""Reset all patterns.
|
|
|
|
RETURNS: None
|
|
DOCS: https://spacy.io/api/spanruler#clear
|
|
"""
|
|
self._patterns: List[PatternType] = []
|
|
self.matcher: Matcher = Matcher(
|
|
self.nlp.vocab,
|
|
validate=self.validate,
|
|
fuzzy_compare=self.matcher_fuzzy_compare,
|
|
)
|
|
self.phrase_matcher: PhraseMatcher = PhraseMatcher(
|
|
self.nlp.vocab,
|
|
attr=self.phrase_matcher_attr,
|
|
validate=self.validate,
|
|
)
|
|
|
|
def remove(self, label: str) -> None:
|
|
"""Remove a pattern by its label.
|
|
|
|
label (str): Label of the pattern to be removed.
|
|
RETURNS: None
|
|
DOCS: https://spacy.io/api/spanruler#remove
|
|
"""
|
|
if label not in self:
|
|
raise ValueError(
|
|
Errors.E1024.format(attr_type="label", label=label, component=self.name)
|
|
)
|
|
self._patterns = [p for p in self._patterns if p["label"] != label]
|
|
for m_label in self._match_label_id_map:
|
|
if self._match_label_id_map[m_label]["label"] == label:
|
|
m_label_str = self.nlp.vocab.strings.as_string(m_label)
|
|
if m_label_str in self.phrase_matcher:
|
|
self.phrase_matcher.remove(m_label_str)
|
|
if m_label_str in self.matcher:
|
|
self.matcher.remove(m_label_str)
|
|
|
|
def remove_by_id(self, pattern_id: str) -> None:
|
|
"""Remove a pattern by its pattern ID.
|
|
|
|
pattern_id (str): ID of the pattern to be removed.
|
|
RETURNS: None
|
|
DOCS: https://spacy.io/api/spanruler#remove_by_id
|
|
"""
|
|
orig_len = len(self)
|
|
self._patterns = [p for p in self._patterns if p.get("id") != pattern_id]
|
|
if orig_len == len(self):
|
|
raise ValueError(
|
|
Errors.E1024.format(
|
|
attr_type="ID", label=pattern_id, component=self.name
|
|
)
|
|
)
|
|
for m_label in self._match_label_id_map:
|
|
if self._match_label_id_map[m_label]["id"] == pattern_id:
|
|
m_label_str = self.nlp.vocab.strings.as_string(m_label)
|
|
if m_label_str in self.phrase_matcher:
|
|
self.phrase_matcher.remove(m_label_str)
|
|
if m_label_str in self.matcher:
|
|
self.matcher.remove(m_label_str)
|
|
|
|
def _require_patterns(self) -> None:
|
|
"""Raise a warning if this component has no patterns defined."""
|
|
if len(self) == 0:
|
|
warnings.warn(Warnings.W036.format(name=self.name))
|
|
|
|
def from_bytes(
|
|
self, bytes_data: bytes, *, exclude: Iterable[str] = SimpleFrozenList()
|
|
) -> "SpanRuler":
|
|
"""Load the span ruler from a bytestring.
|
|
|
|
bytes_data (bytes): The bytestring to load.
|
|
RETURNS (SpanRuler): The loaded span ruler.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#from_bytes
|
|
"""
|
|
self.clear()
|
|
deserializers = {
|
|
"patterns": lambda b: self.add_patterns(srsly.json_loads(b)),
|
|
}
|
|
util.from_bytes(bytes_data, deserializers, exclude)
|
|
return self
|
|
|
|
def to_bytes(self, *, exclude: Iterable[str] = SimpleFrozenList()) -> bytes:
|
|
"""Serialize the span ruler to a bytestring.
|
|
|
|
RETURNS (bytes): The serialized patterns.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#to_bytes
|
|
"""
|
|
serializers = {
|
|
"patterns": lambda: srsly.json_dumps(self.patterns),
|
|
}
|
|
return util.to_bytes(serializers, exclude)
|
|
|
|
def from_disk(
|
|
self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
|
|
) -> "SpanRuler":
|
|
"""Load the span ruler from a directory.
|
|
|
|
path (Union[str, Path]): A path to a directory.
|
|
RETURNS (SpanRuler): The loaded span ruler.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#from_disk
|
|
"""
|
|
self.clear()
|
|
path = ensure_path(path)
|
|
deserializers = {
|
|
"patterns": lambda p: self.add_patterns(srsly.read_jsonl(p)),
|
|
}
|
|
util.from_disk(path, deserializers, {})
|
|
return self
|
|
|
|
def to_disk(
|
|
self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
|
|
) -> None:
|
|
"""Save the span ruler patterns to a directory.
|
|
|
|
path (Union[str, Path]): A path to a directory.
|
|
|
|
DOCS: https://spacy.io/api/spanruler#to_disk
|
|
"""
|
|
path = ensure_path(path)
|
|
serializers = {
|
|
"patterns": lambda p: srsly.write_jsonl(p, self.patterns),
|
|
}
|
|
util.to_disk(path, serializers, {})
|
|
|
|
|
|
# Setup backwards compatibility hook for factories
|
|
def __getattr__(name):
|
|
if name == "make_span_ruler":
|
|
module = importlib.import_module("spacy.pipeline.factories")
|
|
return module.make_span_ruler
|
|
elif name == "make_entity_ruler":
|
|
module = importlib.import_module("spacy.pipeline.factories")
|
|
return module.make_future_entity_ruler
|
|
raise AttributeError(f"module {__name__} has no attribute {name}")
|