mirror of
				https://github.com/explosion/spaCy.git
				synced 2025-11-04 01:48:04 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			312 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			312 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import srsly
 | 
						|
from typing import List, Dict, Union, Iterable, Any, Optional
 | 
						|
from pathlib import Path
 | 
						|
 | 
						|
from .pipe import Pipe
 | 
						|
from ..errors import Errors
 | 
						|
from ..language import Language
 | 
						|
from ..matcher import Matcher
 | 
						|
from ..symbols import IDS
 | 
						|
from ..tokens import Doc, Span
 | 
						|
from ..tokens._retokenize import normalize_token_attrs, set_token_attrs
 | 
						|
from ..vocab import Vocab
 | 
						|
from .. import util
 | 
						|
 | 
						|
 | 
						|
MatcherPatternType = List[Dict[Union[int, str], Any]]
 | 
						|
AttributeRulerPatternType = Dict[str, Union[MatcherPatternType, Dict, int]]
 | 
						|
 | 
						|
 | 
						|
@Language.factory(
 | 
						|
    "attribute_ruler", default_config={"pattern_dicts": None, "validate": False}
 | 
						|
)
 | 
						|
def make_attribute_ruler(
 | 
						|
    nlp: Language,
 | 
						|
    name: str,
 | 
						|
    pattern_dicts: Optional[Iterable[AttributeRulerPatternType]],
 | 
						|
    validate: bool,
 | 
						|
):
 | 
						|
    return AttributeRuler(
 | 
						|
        nlp.vocab, name, pattern_dicts=pattern_dicts, validate=validate
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
class AttributeRuler(Pipe):
 | 
						|
    """Set token-level attributes for tokens matched by Matcher patterns.
 | 
						|
    Additionally supports importing patterns from tag maps and morph rules.
 | 
						|
 | 
						|
    DOCS: https://spacy.io/api/attributeruler
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        vocab: Vocab,
 | 
						|
        name: str = "attribute_ruler",
 | 
						|
        *,
 | 
						|
        pattern_dicts: Optional[Iterable[AttributeRulerPatternType]] = None,
 | 
						|
        validate: bool = False,
 | 
						|
    ) -> None:
 | 
						|
        """Initialize the AttributeRuler.
 | 
						|
 | 
						|
        vocab (Vocab): The vocab.
 | 
						|
        name (str): The pipe name. Defaults to "attribute_ruler".
 | 
						|
        pattern_dicts (Iterable[Dict]): A list of pattern dicts with the keys as
 | 
						|
        the arguments to AttributeRuler.add (`patterns`/`attrs`/`index`) to add
 | 
						|
        as patterns.
 | 
						|
 | 
						|
        RETURNS (AttributeRuler): The AttributeRuler component.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#init
 | 
						|
        """
 | 
						|
        self.name = name
 | 
						|
        self.vocab = vocab
 | 
						|
        self.matcher = Matcher(self.vocab, validate=validate)
 | 
						|
        self.attrs = []
 | 
						|
        self._attrs_unnormed = []  # store for reference
 | 
						|
        self.indices = []
 | 
						|
 | 
						|
        if pattern_dicts:
 | 
						|
            self.add_patterns(pattern_dicts)
 | 
						|
 | 
						|
    def __call__(self, doc: Doc) -> Doc:
 | 
						|
        """Apply the AttributeRuler to a Doc and set all attribute exceptions.
 | 
						|
 | 
						|
        doc (Doc): The document to process.
 | 
						|
        RETURNS (Doc): The processed Doc.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#call
 | 
						|
        """
 | 
						|
        matches = self.matcher(doc)
 | 
						|
 | 
						|
        for match_id, start, end in matches:
 | 
						|
            span = Span(doc, start, end, label=match_id)
 | 
						|
            attrs = self.attrs[span.label]
 | 
						|
            index = self.indices[span.label]
 | 
						|
            try:
 | 
						|
                token = span[index]
 | 
						|
            except IndexError:
 | 
						|
                raise ValueError(
 | 
						|
                    Errors.E1001.format(
 | 
						|
                        patterns=self.matcher.get(span.label),
 | 
						|
                        span=[t.text for t in span],
 | 
						|
                        index=index,
 | 
						|
                    )
 | 
						|
                ) from None
 | 
						|
            set_token_attrs(token, attrs)
 | 
						|
        return doc
 | 
						|
 | 
						|
    def pipe(self, stream, *, batch_size=128):
 | 
						|
        """Apply the pipe to a stream of documents. This usually happens under
 | 
						|
        the hood when the nlp object is called on a text and all components are
 | 
						|
        applied to the Doc.
 | 
						|
 | 
						|
        stream (Iterable[Doc]): A stream of documents.
 | 
						|
        batch_size (int): The number of documents to buffer.
 | 
						|
        YIELDS (Doc): Processed documents in order.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/attributeruler/pipe#pipe
 | 
						|
        """
 | 
						|
        for doc in stream:
 | 
						|
            doc = self(doc)
 | 
						|
            yield doc
 | 
						|
 | 
						|
    def load_from_tag_map(
 | 
						|
        self, tag_map: Dict[str, Dict[Union[int, str], Union[int, str]]]
 | 
						|
    ) -> None:
 | 
						|
        """Load attribute ruler patterns from a tag map.
 | 
						|
 | 
						|
        tag_map (dict): The tag map that maps fine-grained tags to
 | 
						|
            coarse-grained tags and morphological features.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#load_from_morph_rules
 | 
						|
        """
 | 
						|
        for tag, attrs in tag_map.items():
 | 
						|
            pattern = [{"TAG": tag}]
 | 
						|
            attrs, morph_attrs = _split_morph_attrs(attrs)
 | 
						|
            morph = self.vocab.morphology.add(morph_attrs)
 | 
						|
            attrs["MORPH"] = self.vocab.strings[morph]
 | 
						|
            self.add([pattern], attrs)
 | 
						|
 | 
						|
    def load_from_morph_rules(
 | 
						|
        self, morph_rules: Dict[str, Dict[str, Dict[Union[int, str], Union[int, str]]]]
 | 
						|
    ) -> None:
 | 
						|
        """Load attribute ruler patterns from morph rules.
 | 
						|
 | 
						|
        morph_rules (dict): The morph rules that map token text and
 | 
						|
            fine-grained tags to coarse-grained tags, lemmas and morphological
 | 
						|
            features.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#load_from_morph_rules
 | 
						|
        """
 | 
						|
        for tag in morph_rules:
 | 
						|
            for word in morph_rules[tag]:
 | 
						|
                pattern = [{"ORTH": word, "TAG": tag}]
 | 
						|
                attrs = morph_rules[tag][word]
 | 
						|
                attrs, morph_attrs = _split_morph_attrs(attrs)
 | 
						|
                morph = self.vocab.morphology.add(morph_attrs)
 | 
						|
                attrs["MORPH"] = self.vocab.strings[morph]
 | 
						|
                self.add([pattern], attrs)
 | 
						|
 | 
						|
    def add(
 | 
						|
        self, patterns: Iterable[MatcherPatternType], attrs: Dict, index: int = 0
 | 
						|
    ) -> None:
 | 
						|
        """Add Matcher patterns for tokens that should be modified with the
 | 
						|
        provided attributes. The token at the specified index within the
 | 
						|
        matched span will be assigned the attributes.
 | 
						|
 | 
						|
        patterns (Iterable[List[Dict]]): A list of Matcher patterns.
 | 
						|
        attrs (Dict): The attributes to assign to the target token in the
 | 
						|
            matched span.
 | 
						|
        index (int): The index of the token in the matched span to modify. May
 | 
						|
            be negative to index from the end of the span. Defaults to 0.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#add
 | 
						|
        """
 | 
						|
        self.matcher.add(len(self.attrs), patterns)
 | 
						|
        self._attrs_unnormed.append(attrs)
 | 
						|
        attrs = normalize_token_attrs(self.vocab, attrs)
 | 
						|
        self.attrs.append(attrs)
 | 
						|
        self.indices.append(index)
 | 
						|
 | 
						|
    def add_patterns(self, pattern_dicts: Iterable[AttributeRulerPatternType]) -> None:
 | 
						|
        """Add patterns from a list of pattern dicts with the keys as the
 | 
						|
        arguments to AttributeRuler.add.
 | 
						|
        pattern_dicts (Iterable[dict]): A list of pattern dicts with the keys
 | 
						|
            as the arguments to AttributeRuler.add (patterns/attrs/index) to
 | 
						|
            add as patterns.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#add_patterns
 | 
						|
        """
 | 
						|
        for p in pattern_dicts:
 | 
						|
            self.add(**p)
 | 
						|
 | 
						|
    @property
 | 
						|
    def patterns(self) -> List[AttributeRulerPatternType]:
 | 
						|
        """All the added patterns."""
 | 
						|
        all_patterns = []
 | 
						|
        for i in range(len(self.attrs)):
 | 
						|
            p = {}
 | 
						|
            p["patterns"] = self.matcher.get(i)[1]
 | 
						|
            p["attrs"] = self._attrs_unnormed[i]
 | 
						|
            p["index"] = self.indices[i]
 | 
						|
            all_patterns.append(p)
 | 
						|
        return all_patterns
 | 
						|
 | 
						|
    def to_bytes(self, exclude: Iterable[str] = tuple()) -> bytes:
 | 
						|
        """Serialize the AttributeRuler to a bytestring.
 | 
						|
 | 
						|
        exclude (Iterable[str]): String names of serialization fields to exclude.
 | 
						|
        RETURNS (bytes): The serialized object.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#to_bytes
 | 
						|
        """
 | 
						|
        serialize = {}
 | 
						|
        serialize["vocab"] = self.vocab.to_bytes
 | 
						|
        patterns = {k: self.matcher.get(k)[1] for k in range(len(self.attrs))}
 | 
						|
        serialize["patterns"] = lambda: srsly.msgpack_dumps(patterns)
 | 
						|
        serialize["attrs"] = lambda: srsly.msgpack_dumps(self.attrs)
 | 
						|
        serialize["indices"] = lambda: srsly.msgpack_dumps(self.indices)
 | 
						|
        return util.to_bytes(serialize, exclude)
 | 
						|
 | 
						|
    def from_bytes(self, bytes_data: bytes, exclude: Iterable[str] = tuple()):
 | 
						|
        """Load the AttributeRuler from a bytestring.
 | 
						|
 | 
						|
        bytes_data (bytes): The data to load.
 | 
						|
        exclude (Iterable[str]): String names of serialization fields to exclude.
 | 
						|
        returns (AttributeRuler): The loaded object.
 | 
						|
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#from_bytes
 | 
						|
        """
 | 
						|
        data = {"patterns": b""}
 | 
						|
 | 
						|
        def load_patterns(b):
 | 
						|
            data["patterns"] = srsly.msgpack_loads(b)
 | 
						|
 | 
						|
        def load_attrs(b):
 | 
						|
            self.attrs = srsly.msgpack_loads(b)
 | 
						|
 | 
						|
        def load_indices(b):
 | 
						|
            self.indices = srsly.msgpack_loads(b)
 | 
						|
 | 
						|
        deserialize = {
 | 
						|
            "vocab": lambda b: self.vocab.from_bytes(b),
 | 
						|
            "patterns": load_patterns,
 | 
						|
            "attrs": load_attrs,
 | 
						|
            "indices": load_indices,
 | 
						|
        }
 | 
						|
        util.from_bytes(bytes_data, deserialize, exclude)
 | 
						|
 | 
						|
        if data["patterns"]:
 | 
						|
            for key, pattern in data["patterns"].items():
 | 
						|
                self.matcher.add(key, pattern)
 | 
						|
            assert len(self.attrs) == len(data["patterns"])
 | 
						|
            assert len(self.indices) == len(data["patterns"])
 | 
						|
 | 
						|
        return self
 | 
						|
 | 
						|
    def to_disk(self, path: Union[Path, str], exclude: Iterable[str] = tuple()) -> None:
 | 
						|
        """Serialize the AttributeRuler to disk.
 | 
						|
 | 
						|
        path (Union[Path, str]): A path to a directory.
 | 
						|
        exclude (Iterable[str]): String names of serialization fields to exclude.
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#to_disk
 | 
						|
        """
 | 
						|
        patterns = {k: self.matcher.get(k)[1] for k in range(len(self.attrs))}
 | 
						|
        serialize = {
 | 
						|
            "vocab": lambda p: self.vocab.to_disk(p),
 | 
						|
            "patterns": lambda p: srsly.write_msgpack(p, patterns),
 | 
						|
            "attrs": lambda p: srsly.write_msgpack(p, self.attrs),
 | 
						|
            "indices": lambda p: srsly.write_msgpack(p, self.indices),
 | 
						|
        }
 | 
						|
        util.to_disk(path, serialize, exclude)
 | 
						|
 | 
						|
    def from_disk(
 | 
						|
        self, path: Union[Path, str], exclude: Iterable[str] = tuple()
 | 
						|
    ) -> None:
 | 
						|
        """Load the AttributeRuler from disk.
 | 
						|
 | 
						|
        path (Union[Path, str]): A path to a directory.
 | 
						|
        exclude (Iterable[str]): String names of serialization fields to exclude.
 | 
						|
        DOCS: https://spacy.io/api/attributeruler#from_disk
 | 
						|
        """
 | 
						|
        data = {"patterns": b""}
 | 
						|
 | 
						|
        def load_patterns(p):
 | 
						|
            data["patterns"] = srsly.read_msgpack(p)
 | 
						|
 | 
						|
        def load_attrs(p):
 | 
						|
            self.attrs = srsly.read_msgpack(p)
 | 
						|
 | 
						|
        def load_indices(p):
 | 
						|
            self.indices = srsly.read_msgpack(p)
 | 
						|
 | 
						|
        deserialize = {
 | 
						|
            "vocab": lambda p: self.vocab.from_disk(p),
 | 
						|
            "patterns": load_patterns,
 | 
						|
            "attrs": load_attrs,
 | 
						|
            "indices": load_indices,
 | 
						|
        }
 | 
						|
        util.from_disk(path, deserialize, exclude)
 | 
						|
 | 
						|
        if data["patterns"]:
 | 
						|
            for key, pattern in data["patterns"].items():
 | 
						|
                self.matcher.add(key, pattern)
 | 
						|
            assert len(self.attrs) == len(data["patterns"])
 | 
						|
            assert len(self.indices) == len(data["patterns"])
 | 
						|
 | 
						|
        return self
 | 
						|
 | 
						|
 | 
						|
def _split_morph_attrs(attrs):
 | 
						|
    """Split entries from a tag map or morph rules dict into to two dicts, one
 | 
						|
    with the token-level features (POS, LEMMA) and one with the remaining
 | 
						|
    features, which are presumed to be individual MORPH features."""
 | 
						|
    other_attrs = {}
 | 
						|
    morph_attrs = {}
 | 
						|
    for k, v in attrs.items():
 | 
						|
        if k in "_" or k in IDS.keys() or k in IDS.values():
 | 
						|
            other_attrs[k] = v
 | 
						|
        else:
 | 
						|
            morph_attrs[k] = v
 | 
						|
    return other_attrs, morph_attrs
 |