mirror of
				https://github.com/explosion/spaCy.git
				synced 2025-10-30 23:47:31 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			337 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			337 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import List, Dict, Union, Iterable, Any, Optional, Callable, Iterator
 | |
| from typing import Tuple
 | |
| import srsly
 | |
| from pathlib import Path
 | |
| 
 | |
| from .pipe import Pipe
 | |
| from ..errors import Errors
 | |
| from ..training import validate_examples, Example
 | |
| from ..language import Language
 | |
| from ..matcher import Matcher
 | |
| from ..scorer import Scorer
 | |
| from ..symbols import IDS, TAG, POS, MORPH, LEMMA
 | |
| from ..tokens import Doc, Span
 | |
| from ..tokens._retokenize import normalize_token_attrs, set_token_attrs
 | |
| from ..vocab import Vocab
 | |
| from ..util import SimpleFrozenList
 | |
| from .. import util
 | |
| 
 | |
| 
 | |
| MatcherPatternType = List[Dict[Union[int, str], Any]]
 | |
| AttributeRulerPatternType = Dict[str, Union[MatcherPatternType, Dict, int]]
 | |
| TagMapType = Dict[str, Dict[Union[int, str], Union[int, str]]]
 | |
| MorphRulesType = Dict[str, Dict[str, Dict[Union[int, str], Union[int, str]]]]
 | |
| 
 | |
| 
 | |
| @Language.factory("attribute_ruler", default_config={"validate": False})
 | |
| def make_attribute_ruler(nlp: Language, name: str, validate: bool):
 | |
|     return AttributeRuler(nlp.vocab, name, validate=validate)
 | |
| 
 | |
| 
 | |
| class AttributeRuler(Pipe):
 | |
|     """Set token-level attributes for tokens matched by Matcher patterns.
 | |
|     Additionally supports importing patterns from tag maps and morph rules.
 | |
| 
 | |
|     DOCS: https://nightly.spacy.io/api/attributeruler
 | |
|     """
 | |
| 
 | |
|     def __init__(
 | |
|         self, vocab: Vocab, name: str = "attribute_ruler", *, validate: bool = False
 | |
|     ) -> None:
 | |
|         """Create the AttributeRuler. After creation, you can add patterns
 | |
|         with the `.initialize()` or `.add_patterns()` methods, or load patterns
 | |
|         with `.from_bytes()` or `.from_disk()`. Loading patterns will remove
 | |
|         any patterns you've added previously.
 | |
| 
 | |
|         vocab (Vocab): The vocab.
 | |
|         name (str): The pipe name. Defaults to "attribute_ruler".
 | |
| 
 | |
|         RETURNS (AttributeRuler): The AttributeRuler component.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#init
 | |
|         """
 | |
|         self.name = name
 | |
|         self.vocab = vocab
 | |
|         self.matcher = Matcher(self.vocab, validate=validate)
 | |
|         self.attrs = []
 | |
|         self._attrs_unnormed = []  # store for reference
 | |
|         self.indices = []
 | |
| 
 | |
|     def initialize(
 | |
|         self,
 | |
|         get_examples: Optional[Callable[[], Iterable[Example]]],
 | |
|         *,
 | |
|         nlp: Optional[Language] = None,
 | |
|         patterns: Optional[Iterable[AttributeRulerPatternType]] = None,
 | |
|         tag_map: Optional[TagMapType] = None,
 | |
|         morph_rules: Optional[MorphRulesType] = None,
 | |
|     ):
 | |
|         """Initialize the attribute ruler by adding zero or more patterns.
 | |
| 
 | |
|         Rules can be specified as a sequence of dicts using the `patterns`
 | |
|         keyword argument. You can also provide rules using the "tag map" or
 | |
|         "morph rules" formats supported by spaCy prior to v3.
 | |
|         """
 | |
|         if patterns:
 | |
|             self.add_patterns(patterns)
 | |
|         if tag_map:
 | |
|             self.load_from_tag_map(tag_map)
 | |
|         if morph_rules:
 | |
|             self.load_from_morph_rules(morph_rules)
 | |
| 
 | |
|     def __call__(self, doc: Doc) -> Doc:
 | |
|         """Apply the AttributeRuler to a Doc and set all attribute exceptions.
 | |
| 
 | |
|         doc (Doc): The document to process.
 | |
|         RETURNS (Doc): The processed Doc.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#call
 | |
|         """
 | |
|         matches = self.matcher(doc, allow_missing=True)
 | |
|         # Sort by the attribute ID, so that later rules have precendence
 | |
|         matches = [
 | |
|             (int(self.vocab.strings[m_id]), m_id, s, e) for m_id, s, e in matches
 | |
|         ]
 | |
|         matches.sort()
 | |
|         for attr_id, match_id, start, end in matches:
 | |
|             span = Span(doc, start, end, label=match_id)
 | |
|             attrs = self.attrs[attr_id]
 | |
|             index = self.indices[attr_id]
 | |
|             try:
 | |
|                 # The index can be negative, which makes it annoying to do
 | |
|                 # the boundscheck. Let Span do it instead.
 | |
|                 token = span[index]  # noqa: F841
 | |
|             except IndexError:
 | |
|                 # The original exception is just our conditional logic, so we
 | |
|                 # raise from.
 | |
|                 raise ValueError(
 | |
|                     Errors.E1001.format(
 | |
|                         patterns=self.matcher.get(span.label),
 | |
|                         span=[t.text for t in span],
 | |
|                         index=index,
 | |
|                     )
 | |
|                 ) from None
 | |
|             set_token_attrs(span[index], attrs)
 | |
|         return doc
 | |
| 
 | |
|     def pipe(self, stream: Iterable[Doc], *, batch_size: int = 128) -> Iterator[Doc]:
 | |
|         """Apply the pipe to a stream of documents. This usually happens under
 | |
|         the hood when the nlp object is called on a text and all components are
 | |
|         applied to the Doc.
 | |
| 
 | |
|         stream (Iterable[Doc]): A stream of documents.
 | |
|         batch_size (int): The number of documents to buffer.
 | |
|         YIELDS (Doc): Processed documents in order.
 | |
| 
 | |
|         DOCS: https://spacy.io/attributeruler/pipe#pipe
 | |
|         """
 | |
|         for doc in stream:
 | |
|             doc = self(doc)
 | |
|             yield doc
 | |
| 
 | |
|     def load_from_tag_map(
 | |
|         self, tag_map: Dict[str, Dict[Union[int, str], Union[int, str]]]
 | |
|     ) -> None:
 | |
|         """Load attribute ruler patterns from a tag map.
 | |
| 
 | |
|         tag_map (dict): The tag map that maps fine-grained tags to
 | |
|             coarse-grained tags and morphological features.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#load_from_morph_rules
 | |
|         """
 | |
|         for tag, attrs in tag_map.items():
 | |
|             pattern = [{"TAG": tag}]
 | |
|             attrs, morph_attrs = _split_morph_attrs(attrs)
 | |
|             if "MORPH" not in attrs:
 | |
|                 morph = self.vocab.morphology.add(morph_attrs)
 | |
|                 attrs["MORPH"] = self.vocab.strings[morph]
 | |
|             else:
 | |
|                 morph = self.vocab.morphology.add(attrs["MORPH"])
 | |
|                 attrs["MORPH"] = self.vocab.strings[morph]
 | |
|             self.add([pattern], attrs)
 | |
| 
 | |
|     def load_from_morph_rules(
 | |
|         self, morph_rules: Dict[str, Dict[str, Dict[Union[int, str], Union[int, str]]]]
 | |
|     ) -> None:
 | |
|         """Load attribute ruler patterns from morph rules.
 | |
| 
 | |
|         morph_rules (dict): The morph rules that map token text and
 | |
|             fine-grained tags to coarse-grained tags, lemmas and morphological
 | |
|             features.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#load_from_morph_rules
 | |
|         """
 | |
|         for tag in morph_rules:
 | |
|             for word in morph_rules[tag]:
 | |
|                 pattern = [{"ORTH": word, "TAG": tag}]
 | |
|                 attrs = morph_rules[tag][word]
 | |
|                 attrs, morph_attrs = _split_morph_attrs(attrs)
 | |
|                 if "MORPH" in attrs:
 | |
|                     morph = self.vocab.morphology.add(attrs["MORPH"])
 | |
|                     attrs["MORPH"] = self.vocab.strings[morph]
 | |
|                 elif morph_attrs:
 | |
|                     morph = self.vocab.morphology.add(morph_attrs)
 | |
|                     attrs["MORPH"] = self.vocab.strings[morph]
 | |
|                 self.add([pattern], attrs)
 | |
| 
 | |
|     def add(
 | |
|         self, patterns: Iterable[MatcherPatternType], attrs: Dict, index: int = 0
 | |
|     ) -> None:
 | |
|         """Add Matcher patterns for tokens that should be modified with the
 | |
|         provided attributes. The token at the specified index within the
 | |
|         matched span will be assigned the attributes.
 | |
| 
 | |
|         patterns (Iterable[List[Dict]]): A list of Matcher patterns.
 | |
|         attrs (Dict): The attributes to assign to the target token in the
 | |
|             matched span.
 | |
|         index (int): The index of the token in the matched span to modify. May
 | |
|             be negative to index from the end of the span. Defaults to 0.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#add
 | |
|         """
 | |
|         # We need to make a string here, because otherwise the ID we pass back
 | |
|         # will be interpreted as the hash of a string, rather than an ordinal.
 | |
|         key = str(len(self.attrs))
 | |
|         self.matcher.add(self.vocab.strings.add(key), patterns)
 | |
|         self._attrs_unnormed.append(attrs)
 | |
|         attrs = normalize_token_attrs(self.vocab, attrs)
 | |
|         self.attrs.append(attrs)
 | |
|         self.indices.append(index)
 | |
| 
 | |
|     def add_patterns(self, patterns: Iterable[AttributeRulerPatternType]) -> None:
 | |
|         """Add patterns from a list of pattern dicts with the keys as the
 | |
|         arguments to AttributeRuler.add.
 | |
|         patterns (Iterable[dict]): A list of pattern dicts with the keys
 | |
|             as the arguments to AttributeRuler.add (patterns/attrs/index) to
 | |
|             add as patterns.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#add_patterns
 | |
|         """
 | |
|         for p in patterns:
 | |
|             self.add(**p)
 | |
| 
 | |
|     @property
 | |
|     def patterns(self) -> List[AttributeRulerPatternType]:
 | |
|         """All the added patterns."""
 | |
|         all_patterns = []
 | |
|         for i in range(len(self.attrs)):
 | |
|             p = {}
 | |
|             p["patterns"] = self.matcher.get(str(i))[1]
 | |
|             p["attrs"] = self._attrs_unnormed[i]
 | |
|             p["index"] = self.indices[i]
 | |
|             all_patterns.append(p)
 | |
|         return all_patterns
 | |
| 
 | |
|     def score(self, examples: Iterable[Example], **kwargs) -> Dict[str, Any]:
 | |
|         """Score a batch of examples.
 | |
| 
 | |
|         examples (Iterable[Example]): The examples to score.
 | |
|         RETURNS (Dict[str, Any]): The scores, produced by
 | |
|             Scorer.score_token_attr for the attributes "tag", "pos", "morph"
 | |
|             and "lemma" for the target token attributes.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/tagger#score
 | |
|         """
 | |
|         validate_examples(examples, "AttributeRuler.score")
 | |
|         results = {}
 | |
|         attrs = set()
 | |
|         for token_attrs in self.attrs:
 | |
|             attrs.update(token_attrs)
 | |
|         for attr in attrs:
 | |
|             if attr == TAG:
 | |
|                 results.update(Scorer.score_token_attr(examples, "tag", **kwargs))
 | |
|             elif attr == POS:
 | |
|                 results.update(Scorer.score_token_attr(examples, "pos", **kwargs))
 | |
|             elif attr == MORPH:
 | |
|                 results.update(Scorer.score_token_attr(examples, "morph", **kwargs))
 | |
|             elif attr == LEMMA:
 | |
|                 results.update(Scorer.score_token_attr(examples, "lemma", **kwargs))
 | |
|         return results
 | |
| 
 | |
|     def to_bytes(self, exclude: Iterable[str] = SimpleFrozenList()) -> bytes:
 | |
|         """Serialize the AttributeRuler to a bytestring.
 | |
| 
 | |
|         exclude (Iterable[str]): String names of serialization fields to exclude.
 | |
|         RETURNS (bytes): The serialized object.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#to_bytes
 | |
|         """
 | |
|         serialize = {}
 | |
|         serialize["vocab"] = self.vocab.to_bytes
 | |
|         serialize["patterns"] = lambda: srsly.msgpack_dumps(self.patterns)
 | |
|         return util.to_bytes(serialize, exclude)
 | |
| 
 | |
|     def from_bytes(
 | |
|         self, bytes_data: bytes, exclude: Iterable[str] = SimpleFrozenList()
 | |
|     ) -> "AttributeRuler":
 | |
|         """Load the AttributeRuler from a bytestring.
 | |
| 
 | |
|         bytes_data (bytes): The data to load.
 | |
|         exclude (Iterable[str]): String names of serialization fields to exclude.
 | |
|         returns (AttributeRuler): The loaded object.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#from_bytes
 | |
|         """
 | |
| 
 | |
|         def load_patterns(b):
 | |
|             self.add_patterns(srsly.msgpack_loads(b))
 | |
| 
 | |
|         deserialize = {
 | |
|             "vocab": lambda b: self.vocab.from_bytes(b),
 | |
|             "patterns": load_patterns,
 | |
|         }
 | |
|         util.from_bytes(bytes_data, deserialize, exclude)
 | |
|         return self
 | |
| 
 | |
|     def to_disk(
 | |
|         self, path: Union[Path, str], exclude: Iterable[str] = SimpleFrozenList()
 | |
|     ) -> None:
 | |
|         """Serialize the AttributeRuler to disk.
 | |
| 
 | |
|         path (Union[Path, str]): A path to a directory.
 | |
|         exclude (Iterable[str]): String names of serialization fields to exclude.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#to_disk
 | |
|         """
 | |
|         serialize = {
 | |
|             "vocab": lambda p: self.vocab.to_disk(p),
 | |
|             "patterns": lambda p: srsly.write_msgpack(p, self.patterns),
 | |
|         }
 | |
|         util.to_disk(path, serialize, exclude)
 | |
| 
 | |
|     def from_disk(
 | |
|         self, path: Union[Path, str], exclude: Iterable[str] = SimpleFrozenList()
 | |
|     ) -> "AttributeRuler":
 | |
|         """Load the AttributeRuler from disk.
 | |
| 
 | |
|         path (Union[Path, str]): A path to a directory.
 | |
|         exclude (Iterable[str]): String names of serialization fields to exclude.
 | |
|         RETURNS (AttributeRuler): The loaded object.
 | |
| 
 | |
|         DOCS: https://nightly.spacy.io/api/attributeruler#from_disk
 | |
|         """
 | |
| 
 | |
|         def load_patterns(p):
 | |
|             self.add_patterns(srsly.read_msgpack(p))
 | |
| 
 | |
|         deserialize = {
 | |
|             "vocab": lambda p: self.vocab.from_disk(p),
 | |
|             "patterns": load_patterns,
 | |
|         }
 | |
|         util.from_disk(path, deserialize, exclude)
 | |
|         return self
 | |
| 
 | |
| 
 | |
| def _split_morph_attrs(attrs: dict) -> Tuple[dict, dict]:
 | |
|     """Split entries from a tag map or morph rules dict into to two dicts, one
 | |
|     with the token-level features (POS, LEMMA) and one with the remaining
 | |
|     features, which are presumed to be individual MORPH features."""
 | |
|     other_attrs = {}
 | |
|     morph_attrs = {}
 | |
|     for k, v in attrs.items():
 | |
|         if k in "_" or k in IDS.keys() or k in IDS.values():
 | |
|             other_attrs[k] = v
 | |
|         else:
 | |
|             morph_attrs[k] = v
 | |
|     return other_attrs, morph_attrs
 |