mirror of
				https://github.com/explosion/spaCy.git
				synced 2025-10-26 05:31:15 +03:00 
			
		
		
		
	Use the morph hash rather than the `MorphAnalysis` object in the cache key so that the `Lemmatizer` can be pickled.
		
			
				
	
	
		
			322 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			322 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import Optional, List, Dict, Any, Callable, Iterable, Union, Tuple
 | |
| from thinc.api import Model
 | |
| from pathlib import Path
 | |
| 
 | |
| from .pipe import Pipe
 | |
| from ..errors import Errors, Warnings
 | |
| from ..language import Language
 | |
| from ..training import Example
 | |
| from ..lookups import Lookups, load_lookups
 | |
| from ..scorer import Scorer
 | |
| from ..tokens import Doc, Token
 | |
| from ..vocab import Vocab
 | |
| from ..training import validate_examples
 | |
| from ..util import logger, SimpleFrozenList
 | |
| from .. import util
 | |
| 
 | |
| 
 | |
| @Language.factory(
 | |
|     "lemmatizer",
 | |
|     assigns=["token.lemma"],
 | |
|     default_config={"model": None, "mode": "lookup", "overwrite": False},
 | |
|     default_score_weights={"lemma_acc": 1.0},
 | |
| )
 | |
| def make_lemmatizer(
 | |
|     nlp: Language, model: Optional[Model], name: str, mode: str, overwrite: bool = False
 | |
| ):
 | |
|     return Lemmatizer(nlp.vocab, model, name, mode=mode, overwrite=overwrite)
 | |
| 
 | |
| 
 | |
| class Lemmatizer(Pipe):
 | |
|     """
 | |
|     The Lemmatizer supports simple part-of-speech-sensitive suffix rules and
 | |
|     lookup tables.
 | |
| 
 | |
|     DOCS: https://spacy.io/api/lemmatizer
 | |
|     """
 | |
| 
 | |
|     @classmethod
 | |
|     def get_lookups_config(cls, mode: str) -> Tuple[List[str], List[str]]:
 | |
|         """Returns the lookups configuration settings for a given mode for use
 | |
|         in Lemmatizer.load_lookups.
 | |
| 
 | |
|         mode (str): The lemmatizer mode.
 | |
|         RETURNS (Tuple[List[str], List[str]]): The required and optional
 | |
|             lookup tables for this mode.
 | |
|         """
 | |
|         if mode == "lookup":
 | |
|             return (["lemma_lookup"], [])
 | |
|         elif mode == "rule":
 | |
|             return (["lemma_rules"], ["lemma_exc", "lemma_index"])
 | |
|         return ([], [])
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         vocab: Vocab,
 | |
|         model: Optional[Model],
 | |
|         name: str = "lemmatizer",
 | |
|         *,
 | |
|         mode: str = "lookup",
 | |
|         overwrite: bool = False,
 | |
|     ) -> None:
 | |
|         """Initialize a Lemmatizer.
 | |
| 
 | |
|         vocab (Vocab): The vocab.
 | |
|         model (Model): A model (not yet implemented).
 | |
|         name (str): The component name. Defaults to "lemmatizer".
 | |
|         mode (str): The lemmatizer mode: "lookup", "rule". Defaults to "lookup".
 | |
|         overwrite (bool): Whether to overwrite existing lemmas. Defaults to
 | |
|             `False`.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#init
 | |
|         """
 | |
|         self.vocab = vocab
 | |
|         self.model = model
 | |
|         self.name = name
 | |
|         self._mode = mode
 | |
|         self.lookups = Lookups()
 | |
|         self.overwrite = overwrite
 | |
|         self._validated = False
 | |
|         if self.mode == "lookup":
 | |
|             self.lemmatize = self.lookup_lemmatize
 | |
|         elif self.mode == "rule":
 | |
|             self.lemmatize = self.rule_lemmatize
 | |
|         else:
 | |
|             mode_attr = f"{self.mode}_lemmatize"
 | |
|             if not hasattr(self, mode_attr):
 | |
|                 raise ValueError(Errors.E1003.format(mode=mode))
 | |
|             self.lemmatize = getattr(self, mode_attr)
 | |
|         self.cache = {}
 | |
| 
 | |
|     @property
 | |
|     def mode(self):
 | |
|         return self._mode
 | |
| 
 | |
|     def __call__(self, doc: Doc) -> Doc:
 | |
|         """Apply the lemmatizer to one document.
 | |
| 
 | |
|         doc (Doc): The Doc to process.
 | |
|         RETURNS (Doc): The processed Doc.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#call
 | |
|         """
 | |
|         if not self._validated:
 | |
|             self._validate_tables(Errors.E1004)
 | |
|         error_handler = self.get_error_handler()
 | |
|         try:
 | |
|             for token in doc:
 | |
|                 if self.overwrite or token.lemma == 0:
 | |
|                     token.lemma_ = self.lemmatize(token)[0]
 | |
|             return doc
 | |
|         except Exception as e:
 | |
|             error_handler(self.name, self, [doc], e)
 | |
| 
 | |
|     def initialize(
 | |
|         self,
 | |
|         get_examples: Optional[Callable[[], Iterable[Example]]] = None,
 | |
|         *,
 | |
|         nlp: Optional[Language] = None,
 | |
|         lookups: Optional[Lookups] = None,
 | |
|     ):
 | |
|         """Initialize the lemmatizer and load in data.
 | |
| 
 | |
|         get_examples (Callable[[], Iterable[Example]]): Function that
 | |
|             returns a representative sample of gold-standard Example objects.
 | |
|         nlp (Language): The current nlp object the component is part of.
 | |
|         lookups (Lookups): The lookups object containing the (optional) tables
 | |
|             such as "lemma_rules", "lemma_index", "lemma_exc" and
 | |
|             "lemma_lookup". Defaults to None.
 | |
|         """
 | |
|         required_tables, optional_tables = self.get_lookups_config(self.mode)
 | |
|         if lookups is None:
 | |
|             logger.debug("Lemmatizer: loading tables from spacy-lookups-data")
 | |
|             lookups = load_lookups(lang=self.vocab.lang, tables=required_tables)
 | |
|             optional_lookups = load_lookups(
 | |
|                 lang=self.vocab.lang, tables=optional_tables, strict=False
 | |
|             )
 | |
|             for table in optional_lookups.tables:
 | |
|                 lookups.set_table(table, optional_lookups.get_table(table))
 | |
|         self.lookups = lookups
 | |
|         self._validate_tables(Errors.E1004)
 | |
| 
 | |
|     def _validate_tables(self, error_message: str = Errors.E912) -> None:
 | |
|         """Check that the lookups are correct for the current mode."""
 | |
|         required_tables, optional_tables = self.get_lookups_config(self.mode)
 | |
|         for table in required_tables:
 | |
|             if table not in self.lookups:
 | |
|                 raise ValueError(
 | |
|                     error_message.format(
 | |
|                         mode=self.mode,
 | |
|                         tables=required_tables,
 | |
|                         found=self.lookups.tables,
 | |
|                     )
 | |
|                 )
 | |
|         self._validated = True
 | |
| 
 | |
|     def lookup_lemmatize(self, token: Token) -> List[str]:
 | |
|         """Lemmatize using a lookup-based approach.
 | |
| 
 | |
|         token (Token): The token to lemmatize.
 | |
|         RETURNS (list): The available lemmas for the string.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#lookup_lemmatize
 | |
|         """
 | |
|         lookup_table = self.lookups.get_table("lemma_lookup", {})
 | |
|         result = lookup_table.get(token.text, token.text)
 | |
|         if isinstance(result, str):
 | |
|             result = [result]
 | |
|         return result
 | |
| 
 | |
|     def rule_lemmatize(self, token: Token) -> List[str]:
 | |
|         """Lemmatize using a rule-based approach.
 | |
| 
 | |
|         token (Token): The token to lemmatize.
 | |
|         RETURNS (list): The available lemmas for the string.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#rule_lemmatize
 | |
|         """
 | |
|         cache_key = (token.orth, token.pos, token.morph.key)
 | |
|         if cache_key in self.cache:
 | |
|             return self.cache[cache_key]
 | |
|         string = token.text
 | |
|         univ_pos = token.pos_.lower()
 | |
|         if univ_pos in ("", "eol", "space"):
 | |
|             if univ_pos == "":
 | |
|                 logger.warning(Warnings.W108.format(text=string))
 | |
|             return [string.lower()]
 | |
|         # See Issue #435 for example of where this logic is requied.
 | |
|         if self.is_base_form(token):
 | |
|             return [string.lower()]
 | |
|         index_table = self.lookups.get_table("lemma_index", {})
 | |
|         exc_table = self.lookups.get_table("lemma_exc", {})
 | |
|         rules_table = self.lookups.get_table("lemma_rules", {})
 | |
|         if not any(
 | |
|             (
 | |
|                 index_table.get(univ_pos),
 | |
|                 exc_table.get(univ_pos),
 | |
|                 rules_table.get(univ_pos),
 | |
|             )
 | |
|         ):
 | |
|             if univ_pos == "propn":
 | |
|                 return [string]
 | |
|             else:
 | |
|                 return [string.lower()]
 | |
| 
 | |
|         index = index_table.get(univ_pos, {})
 | |
|         exceptions = exc_table.get(univ_pos, {})
 | |
|         rules = rules_table.get(univ_pos, {})
 | |
|         orig = string
 | |
|         string = string.lower()
 | |
|         forms = []
 | |
|         oov_forms = []
 | |
|         for old, new in rules:
 | |
|             if string.endswith(old):
 | |
|                 form = string[: len(string) - len(old)] + new
 | |
|                 if not form:
 | |
|                     pass
 | |
|                 elif form in index or not form.isalpha():
 | |
|                     forms.append(form)
 | |
|                 else:
 | |
|                     oov_forms.append(form)
 | |
|         # Remove duplicates but preserve the ordering of applied "rules"
 | |
|         forms = list(dict.fromkeys(forms))
 | |
|         # Put exceptions at the front of the list, so they get priority.
 | |
|         # This is a dodgy heuristic -- but it's the best we can do until we get
 | |
|         # frequencies on this. We can at least prune out problematic exceptions,
 | |
|         # if they shadow more frequent analyses.
 | |
|         for form in exceptions.get(string, []):
 | |
|             if form not in forms:
 | |
|                 forms.insert(0, form)
 | |
|         if not forms:
 | |
|             forms.extend(oov_forms)
 | |
|         if not forms:
 | |
|             forms.append(orig)
 | |
|         self.cache[cache_key] = forms
 | |
|         return forms
 | |
| 
 | |
|     def is_base_form(self, token: Token) -> bool:
 | |
|         """Check whether the token is a base form that does not need further
 | |
|         analysis for lemmatization.
 | |
| 
 | |
|         token (Token): The token.
 | |
|         RETURNS (bool): Whether the token is a base form.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#is_base_form
 | |
|         """
 | |
|         return False
 | |
| 
 | |
|     def score(self, examples: Iterable[Example], **kwargs) -> Dict[str, Any]:
 | |
|         """Score a batch of examples.
 | |
| 
 | |
|         examples (Iterable[Example]): The examples to score.
 | |
|         RETURNS (Dict[str, Any]): The scores.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#score
 | |
|         """
 | |
|         validate_examples(examples, "Lemmatizer.score")
 | |
|         return Scorer.score_token_attr(examples, "lemma", **kwargs)
 | |
| 
 | |
|     def to_disk(
 | |
|         self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
 | |
|     ):
 | |
|         """Serialize the pipe to disk.
 | |
| 
 | |
|         path (str / Path): Path to a directory.
 | |
|         exclude (Iterable[str]): String names of serialization fields to exclude.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#to_disk
 | |
|         """
 | |
|         serialize = {}
 | |
|         serialize["vocab"] = lambda p: self.vocab.to_disk(p)
 | |
|         serialize["lookups"] = lambda p: self.lookups.to_disk(p)
 | |
|         util.to_disk(path, serialize, exclude)
 | |
| 
 | |
|     def from_disk(
 | |
|         self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
 | |
|     ) -> "Lemmatizer":
 | |
|         """Load the pipe from disk. Modifies the object in place and returns it.
 | |
| 
 | |
|         path (str / Path): Path to a directory.
 | |
|         exclude (Iterable[str]): String names of serialization fields to exclude.
 | |
|         RETURNS (Lemmatizer): The modified Lemmatizer object.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#from_disk
 | |
|         """
 | |
|         deserialize = {}
 | |
|         deserialize["vocab"] = lambda p: self.vocab.from_disk(p)
 | |
|         deserialize["lookups"] = lambda p: self.lookups.from_disk(p)
 | |
|         util.from_disk(path, deserialize, exclude)
 | |
|         self._validate_tables()
 | |
|         return self
 | |
| 
 | |
|     def to_bytes(self, *, exclude: Iterable[str] = SimpleFrozenList()) -> bytes:
 | |
|         """Serialize the pipe to a bytestring.
 | |
| 
 | |
|         exclude (Iterable[str]): String names of serialization fields to exclude.
 | |
|         RETURNS (bytes): The serialized object.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#to_bytes
 | |
|         """
 | |
|         serialize = {}
 | |
|         serialize["vocab"] = self.vocab.to_bytes
 | |
|         serialize["lookups"] = self.lookups.to_bytes
 | |
|         return util.to_bytes(serialize, exclude)
 | |
| 
 | |
|     def from_bytes(
 | |
|         self, bytes_data: bytes, *, exclude: Iterable[str] = SimpleFrozenList()
 | |
|     ) -> "Lemmatizer":
 | |
|         """Load the pipe from a bytestring.
 | |
| 
 | |
|         bytes_data (bytes): The serialized pipe.
 | |
|         exclude (Iterable[str]): String names of serialization fields to exclude.
 | |
|         RETURNS (Lemmatizer): The loaded Lemmatizer.
 | |
| 
 | |
|         DOCS: https://spacy.io/api/lemmatizer#from_bytes
 | |
|         """
 | |
|         deserialize = {}
 | |
|         deserialize["vocab"] = lambda b: self.vocab.from_bytes(b)
 | |
|         deserialize["lookups"] = lambda b: self.lookups.from_bytes(b)
 | |
|         util.from_bytes(bytes_data, deserialize, exclude)
 | |
|         self._validate_tables()
 | |
|         return self
 |