mirror of
https://github.com/explosion/spaCy.git
synced 2025-01-13 18:56:36 +03:00
Add AttributeRuler for token attribute exceptions
Add the `AttributeRuler` to handle exceptions for token-level attributes. The `AttributeRuler` uses `Matcher` patterns to identify target spans and applies the specified attributes to the token at the provided index in the matched span. A negative index can be used to index from the end of the matched span. The retokenizer is used to "merge" the individual tokens and assign them the provided attributes. Helper functions can import existing tag maps and morph rules to the corresponding `Matcher` patterns. There is an additional minor bug fix for `MORPH` attributes in the retokenizer to correctly normalize the values and to handle `MORPH` alongside `_` in an attrs dict.
This commit is contained in:
parent
256b24b720
commit
63f5951f8b
|
@ -604,6 +604,8 @@ class Errors:
|
|||
"initializing the pipeline:\n"
|
||||
'cfg = {"tokenizer": {"segmenter": "pkuseg", "pkuseg_model": name_or_path}}\n'
|
||||
'nlp = Chinese(config=cfg)')
|
||||
E1001 = ("Target token outside of matched span for match with tokens "
|
||||
"'{span}' and offset '{offset}' matched by patterns '{patterns}'.")
|
||||
|
||||
|
||||
@add_codes
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
from .attributeruler import AttributeRuler
|
||||
from .dep_parser import DependencyParser
|
||||
from .entity_linker import EntityLinker
|
||||
from .ner import EntityRecognizer
|
||||
|
@ -13,6 +14,7 @@ from .tok2vec import Tok2Vec
|
|||
from .functions import merge_entities, merge_noun_chunks, merge_subtokens
|
||||
|
||||
__all__ = [
|
||||
"AttributeRuler",
|
||||
"DependencyParser",
|
||||
"EntityLinker",
|
||||
"EntityRecognizer",
|
||||
|
|
229
spacy/pipeline/attributeruler.py
Normal file
229
spacy/pipeline/attributeruler.py
Normal file
|
@ -0,0 +1,229 @@
|
|||
import srsly
|
||||
from typing import List, Dict, Union, Iterable
|
||||
from pathlib import Path
|
||||
|
||||
from .pipe import Pipe
|
||||
from ..errors import Errors
|
||||
from ..language import Language
|
||||
from ..matcher import Matcher
|
||||
from ..symbols import IDS
|
||||
from ..tokens import Doc
|
||||
from ..vocab import Vocab
|
||||
from .. import util
|
||||
|
||||
|
||||
@Language.factory(
|
||||
"attribute_ruler",
|
||||
assigns=[],
|
||||
default_config={},
|
||||
scores=[],
|
||||
default_score_weights={},
|
||||
)
|
||||
def make_attribute_ruler(
|
||||
nlp: Language, name: str,
|
||||
):
|
||||
return AttributeRuler(nlp.vocab, name)
|
||||
|
||||
|
||||
class AttributeRuler(Pipe):
|
||||
"""Set token-level attributes for tokens matched by Matcher patterns.
|
||||
Additionally supports importing patterns from tag maps and morph rules.
|
||||
|
||||
DOCS: https://spacy.io/api/attributeruler
|
||||
"""
|
||||
|
||||
def __init__(self, vocab: Vocab, name: str = "attributeruler") -> None:
|
||||
"""Initialize the attributeruler.
|
||||
|
||||
RETURNS (AttributeRuler): The attributeruler component.
|
||||
|
||||
DOCS: https://spacy.io/api/attributeruler#init
|
||||
"""
|
||||
self.name = name
|
||||
self.vocab = vocab
|
||||
self.matcher = Matcher(self.vocab)
|
||||
self.attrs = []
|
||||
self.indices = []
|
||||
|
||||
def __call__(self, doc: Doc) -> Doc:
|
||||
"""Apply the attributeruler to a Doc and set all attribute exceptions.
|
||||
|
||||
doc (Doc): The document to process.
|
||||
RETURNS (Doc): The processed Doc.
|
||||
|
||||
DOCS: https://spacy.io/api/attributeruler#call
|
||||
"""
|
||||
matches = self.matcher(doc)
|
||||
with doc.retokenize() as retokenizer:
|
||||
for match_id, start, end in matches:
|
||||
attrs = self.attrs[match_id]
|
||||
index = self.indices[match_id]
|
||||
token = doc[start:end][index]
|
||||
if start <= token.i < end:
|
||||
retokenizer.merge(doc[token.i : token.i + 1], attrs)
|
||||
else:
|
||||
raise ValueError(
|
||||
Errors.E1001.format(
|
||||
patterns=self.matcher.get(match_id),
|
||||
span=[t.text for t in doc[start:end]],
|
||||
index=index,
|
||||
)
|
||||
)
|
||||
return doc
|
||||
|
||||
def load_from_tag_map(
|
||||
self, tag_map: Dict[str, Dict[Union[int, str], Union[int, str]]]
|
||||
) -> None:
|
||||
for tag, attrs in tag_map.items():
|
||||
pattern = [{"TAG": tag}]
|
||||
attrs, morph_attrs = _split_morph_attrs(attrs)
|
||||
morph = self.vocab.morphology.add(morph_attrs)
|
||||
attrs["MORPH"] = self.vocab.strings[morph]
|
||||
self.add([pattern], attrs)
|
||||
|
||||
def load_from_morph_rules(
|
||||
self, morph_rules: Dict[str, Dict[str, Dict[Union[int, str], Union[int, str]]]]
|
||||
) -> None:
|
||||
for tag in morph_rules:
|
||||
for word in morph_rules[tag]:
|
||||
pattern = [{"ORTH": word, "TAG": tag}]
|
||||
attrs = morph_rules[tag][word]
|
||||
attrs, morph_attrs = _split_morph_attrs(attrs)
|
||||
morph = self.vocab.morphology.add(morph_attrs)
|
||||
attrs["MORPH"] = self.vocab.strings[morph]
|
||||
self.add([pattern], attrs)
|
||||
|
||||
def add(self, patterns: List[List[Dict]], attrs: Dict, index: int = 0) -> None:
|
||||
"""Add Matcher patterns for tokens that should be modified with the
|
||||
provided attributes. The token at the specified index within the
|
||||
matched span will be assigned the attributes.
|
||||
|
||||
pattern (List[List[Dict]]): A list of Matcher patterns.
|
||||
attrs (Dict): The attributes to assign to the target token in the
|
||||
matched span.
|
||||
index (int): The index of the token in the matched span to modify. May
|
||||
be negative to index from the end of the span. Defaults to 0.
|
||||
|
||||
DOCS: https://spacy.io/api/attributeruler#add
|
||||
"""
|
||||
self.matcher.add(len(self.attrs), patterns)
|
||||
self.attrs.append(attrs)
|
||||
self.indices.append(index)
|
||||
|
||||
def to_bytes(self, exclude: Iterable[str] = tuple()) -> bytes:
|
||||
"""Serialize the attributeruler to a bytestring.
|
||||
|
||||
exclude (Iterable[str]): String names of serialization fields to exclude.
|
||||
RETURNS (bytes): The serialized object.
|
||||
|
||||
DOCS: https://spacy.io/api/attributeruler#to_bytes
|
||||
"""
|
||||
serialize = {}
|
||||
serialize["vocab"] = self.vocab.to_bytes
|
||||
patterns = {k: self.matcher.get(k)[1] for k in range(len(self.attrs))}
|
||||
serialize["patterns"] = lambda: srsly.msgpack_dumps(patterns)
|
||||
serialize["attrs"] = lambda: srsly.msgpack_dumps(self.attrs)
|
||||
serialize["indices"] = lambda: srsly.msgpack_dumps(self.indices)
|
||||
return util.to_bytes(serialize, exclude)
|
||||
|
||||
def from_bytes(self, bytes_data: bytes, exclude: Iterable[str] = tuple()):
|
||||
"""Load the attributeruler from a bytestring.
|
||||
|
||||
bytes_data (bytes): The data to load.
|
||||
exclude (Iterable[str]): String names of serialization fields to exclude.
|
||||
returns (AttributeRuler): The loaded object.
|
||||
|
||||
DOCS: https://spacy.io/api/attributeruler#from_bytes
|
||||
"""
|
||||
data = {"patterns": b""}
|
||||
|
||||
def load_patterns(b):
|
||||
data["patterns"] = srsly.msgpack_loads(b)
|
||||
|
||||
def load_attrs(b):
|
||||
self.attrs = srsly.msgpack_loads(b)
|
||||
|
||||
def load_indices(b):
|
||||
self.indices = srsly.msgpack_loads(b)
|
||||
|
||||
deserialize = {
|
||||
"vocab": lambda b: self.vocab.from_bytes(b),
|
||||
"patterns": load_patterns,
|
||||
"attrs": load_attrs,
|
||||
"indices": load_indices,
|
||||
}
|
||||
util.from_bytes(bytes_data, deserialize, exclude)
|
||||
|
||||
if data["patterns"]:
|
||||
for key, pattern in data["patterns"].items():
|
||||
self.matcher.add(key, pattern)
|
||||
assert len(self.attrs) == len(data["patterns"])
|
||||
assert len(self.indices) == len(data["patterns"])
|
||||
|
||||
return self
|
||||
|
||||
def to_disk(self, path: Union[Path, str], exclude: Iterable[str] = tuple()) -> None:
|
||||
"""Serialize the attributeruler to disk.
|
||||
|
||||
path (Union[Path, str]): A path to a directory.
|
||||
exclude (Iterable[str]): String names of serialization fields to exclude.
|
||||
DOCS: https://spacy.io/api/attributeruler#to_disk
|
||||
"""
|
||||
patterns = {k: self.matcher.get(k)[1] for k in range(len(self.attrs))}
|
||||
serialize = {
|
||||
"vocab": lambda p: self.vocab.to_disk(p),
|
||||
"patterns": lambda p: srsly.write_msgpack(p, patterns),
|
||||
"attrs": lambda p: srsly.write_msgpack(p, self.attrs),
|
||||
"indices": lambda p: srsly.write_msgpack(p, self.indices),
|
||||
}
|
||||
util.to_disk(path, serialize, exclude)
|
||||
|
||||
def from_disk(
|
||||
self, path: Union[Path, str], exclude: Iterable[str] = tuple()
|
||||
) -> None:
|
||||
"""Load the attributeruler from disk.
|
||||
|
||||
path (Union[Path, str]): A path to a directory.
|
||||
exclude (Iterable[str]): String names of serialization fields to exclude.
|
||||
DOCS: https://spacy.io/api/attributeruler#from_disk
|
||||
"""
|
||||
data = {"patterns": b""}
|
||||
|
||||
def load_patterns(p):
|
||||
data["patterns"] = srsly.read_msgpack(p)
|
||||
|
||||
def load_attrs(p):
|
||||
self.attrs = srsly.read_msgpack(p)
|
||||
|
||||
def load_indices(p):
|
||||
self.indices = srsly.read_msgpack(p)
|
||||
|
||||
deserialize = {
|
||||
"vocab": lambda p: self.vocab.from_disk(p),
|
||||
"patterns": load_patterns,
|
||||
"attrs": load_attrs,
|
||||
"indices": load_indices,
|
||||
}
|
||||
util.from_disk(path, deserialize, exclude)
|
||||
|
||||
if data["patterns"]:
|
||||
for key, pattern in data["patterns"].items():
|
||||
self.matcher.add(key, pattern)
|
||||
assert len(self.attrs) == len(data["patterns"])
|
||||
assert len(self.indices) == len(data["patterns"])
|
||||
|
||||
return self
|
||||
|
||||
|
||||
def _split_morph_attrs(attrs):
|
||||
"""Split entries from a tag map or morph rules dict into to two dicts, one
|
||||
with the token-level features (POS, LEMMA) and one with the remaining
|
||||
features, which are presumed to be individual MORPH features."""
|
||||
other_attrs = {}
|
||||
morph_attrs = {}
|
||||
for k, v in attrs.items():
|
||||
if k in "_" or k in IDS.keys() or k in IDS.values():
|
||||
other_attrs[k] = v
|
||||
else:
|
||||
morph_attrs[k] = v
|
||||
return other_attrs, morph_attrs
|
122
spacy/tests/pipeline/test_attributeruler.py
Normal file
122
spacy/tests/pipeline/test_attributeruler.py
Normal file
|
@ -0,0 +1,122 @@
|
|||
import pytest
|
||||
import numpy
|
||||
from spacy.lang.en import English
|
||||
from spacy.pipeline import AttributeRuler
|
||||
from spacy import util
|
||||
|
||||
from ..util import get_doc, make_tempdir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def nlp():
|
||||
return English()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tag_map():
|
||||
return {
|
||||
".": {"POS": "PUNCT", "PunctType": "peri"},
|
||||
",": {"POS": "PUNCT", "PunctType": "comm"},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def morph_rules():
|
||||
return {"DT": {"the": {"POS": "DET", "LEMMA": "a", "Case": "Nom"}}}
|
||||
|
||||
|
||||
def test_attributeruler_init(nlp):
|
||||
a = AttributeRuler(nlp.vocab)
|
||||
|
||||
a = nlp.add_pipe("attribute_ruler")
|
||||
a.add([[{"ORTH": "a"}]], {"LEMMA": "the", "MORPH": "Case=Nom|Number=Plur"})
|
||||
a.add([[{"ORTH": "test"}]], {"LEMMA": "cat", "MORPH": "Number=Sing|Case=Nom"})
|
||||
a.add([[{"ORTH": "test"}]], {"LEMMA": "dog"})
|
||||
|
||||
doc = nlp("This is a test.")
|
||||
assert doc[2].lemma_ == "the"
|
||||
assert doc[2].morph_ == "Case=Nom|Number=Plur"
|
||||
assert doc[3].lemma_ == "cat"
|
||||
assert doc[3].morph_ == "Case=Nom|Number=Sing"
|
||||
|
||||
|
||||
def test_attributeruler_tag_map(nlp, tag_map):
|
||||
a = AttributeRuler(nlp.vocab)
|
||||
a.load_from_tag_map(tag_map)
|
||||
doc = get_doc(nlp.vocab, words=["This", "is", "a", "test", "."], tags=["DT", "VBZ", "DT", "NN", "."])
|
||||
doc = a(doc)
|
||||
|
||||
for i in range(len(doc)):
|
||||
if i == 4:
|
||||
assert doc[i].pos_ == "PUNCT"
|
||||
assert doc[i].morph_ == "PunctType=peri"
|
||||
else:
|
||||
assert doc[i].pos_ == ""
|
||||
assert doc[i].morph_ == ""
|
||||
|
||||
|
||||
def test_attributeruler_morph_rules(nlp, morph_rules):
|
||||
a = AttributeRuler(nlp.vocab)
|
||||
a.load_from_morph_rules(morph_rules)
|
||||
doc = get_doc(nlp.vocab, words=["This", "is", "the", "test", "."], tags=["DT", "VBZ", "DT", "NN", "."])
|
||||
doc = a(doc)
|
||||
|
||||
for i in range(len(doc)):
|
||||
if i != 2:
|
||||
assert doc[i].pos_ == ""
|
||||
assert doc[i].morph_ == ""
|
||||
else:
|
||||
assert doc[2].pos_ == "DET"
|
||||
assert doc[2].lemma_ == "a"
|
||||
assert doc[2].morph_ == "Case=Nom"
|
||||
|
||||
|
||||
def test_attributeruler_indices(nlp):
|
||||
a = nlp.add_pipe("attribute_ruler")
|
||||
a.add([[{"ORTH": "a"}, {"ORTH": "test"}]], {"LEMMA": "the", "MORPH": "Case=Nom|Number=Plur"}, index=0)
|
||||
a.add([[{"ORTH": "This"}, {"ORTH": "is"}]], {"LEMMA": "was", "MORPH": "Case=Nom|Number=Sing"}, index=1)
|
||||
a.add([[{"ORTH": "a"}, {"ORTH": "test"}]], {"LEMMA": "cat"}, index=-1)
|
||||
|
||||
text = "This is a test."
|
||||
doc = nlp(text)
|
||||
|
||||
for i in range(len(doc)):
|
||||
if i == 1:
|
||||
assert doc[i].lemma_ == "was"
|
||||
assert doc[i].morph_ == "Case=Nom|Number=Sing"
|
||||
elif i == 2:
|
||||
assert doc[i].lemma_ == "the"
|
||||
assert doc[i].morph_ == "Case=Nom|Number=Plur"
|
||||
elif i == 3:
|
||||
assert doc[i].lemma_ == "cat"
|
||||
else:
|
||||
assert doc[i].morph_ == ""
|
||||
|
||||
# raises an error when trying to modify a token outside of the match
|
||||
a.add([[{"ORTH": "a"}, {"ORTH": "test"}]], {"LEMMA": "cat"}, index=2)
|
||||
with pytest.raises(ValueError):
|
||||
doc = nlp(text)
|
||||
|
||||
def test_attributeruler_serialize(nlp):
|
||||
a = nlp.add_pipe("attribute_ruler")
|
||||
a.add([[{"ORTH": "a"}, {"ORTH": "test"}]], {"LEMMA": "the", "MORPH": "Case=Nom|Number=Plur"}, index=0)
|
||||
a.add([[{"ORTH": "This"}, {"ORTH": "is"}]], {"LEMMA": "was", "MORPH": "Case=Nom|Number=Sing"}, index=1)
|
||||
a.add([[{"ORTH": "a"}, {"ORTH": "test"}]], {"LEMMA": "cat"}, index=-1)
|
||||
|
||||
text = "This is a test."
|
||||
attrs = ["ORTH", "LEMMA", "MORPH"]
|
||||
doc = nlp(text)
|
||||
|
||||
# bytes roundtrip
|
||||
a_reloaded = AttributeRuler(nlp.vocab).from_bytes(a.to_bytes())
|
||||
assert a.to_bytes() == a_reloaded.to_bytes()
|
||||
doc1 = a_reloaded(nlp.make_doc(text))
|
||||
numpy.array_equal(doc.to_array(attrs), doc1.to_array(attrs))
|
||||
|
||||
# disk roundtrip
|
||||
with make_tempdir() as tmp_dir:
|
||||
nlp.to_disk(tmp_dir)
|
||||
nlp2 = util.load_model_from_path(tmp_dir)
|
||||
doc2 = nlp2(text)
|
||||
assert nlp2.get_pipe("attribute_ruler").to_bytes() == a.to_bytes()
|
||||
assert numpy.array_equal(doc.to_array(attrs), doc2.to_array(attrs))
|
|
@ -66,7 +66,9 @@ cdef class Retokenizer:
|
|||
else:
|
||||
attrs = intify_attrs(attrs, strings_map=self.doc.vocab.strings)
|
||||
if MORPH in attrs:
|
||||
self.doc.vocab.morphology.add(self.doc.vocab.strings.as_string(attrs[MORPH]))
|
||||
# add and set to normalized value
|
||||
morph = self.doc.vocab.morphology.add(self.doc.vocab.strings.as_string(attrs[MORPH]))
|
||||
attrs[MORPH] = morph
|
||||
self.merges.append((span, attrs))
|
||||
|
||||
def split(self, Token token, orths, heads, attrs=SimpleFrozenDict()):
|
||||
|
@ -99,8 +101,10 @@ cdef class Retokenizer:
|
|||
# will only "intify" the keys, not the values
|
||||
attrs = intify_attrs(attrs, strings_map=self.doc.vocab.strings)
|
||||
if MORPH in attrs:
|
||||
for morph in attrs[MORPH]:
|
||||
self.doc.vocab.morphology.add(self.doc.vocab.strings.as_string(morph))
|
||||
for i, morph in enumerate(attrs[MORPH]):
|
||||
# add and set to normalized value
|
||||
morph = self.doc.vocab.morphology.add(self.doc.vocab.strings.as_string(morph))
|
||||
attrs[MORPH][i] = morph
|
||||
head_offsets = []
|
||||
for head in heads:
|
||||
if isinstance(head, Token):
|
||||
|
|
Loading…
Reference in New Issue
Block a user