mirror of
https://github.com/explosion/spaCy.git
synced 2024-11-11 04:08:09 +03:00
43b960c01b
* Update with WIP * Update with WIP * Update with pipeline serialization * Update types and pipe factories * Add deep merge, tidy up and add tests * Fix pipe creation from config * Don't validate default configs on load * Update spacy/language.py Co-authored-by: Ines Montani <ines@ines.io> * Adjust factory/component meta error * Clean up factory args and remove defaults * Add test for failing empty dict defaults * Update pipeline handling and methods * provide KB as registry function instead of as object * small change in test to make functionality more clear * update example script for EL configuration * Fix typo * Simplify test * Simplify test * splitting pipes.pyx into separate files * moving default configs to each component file * fix batch_size type * removing default values from component constructors where possible (TODO: test 4725) * skip instead of xfail * Add test for config -> nlp with multiple instances * pipeline.pipes -> pipeline.pipe * Tidy up, document, remove kwargs * small cleanup/generalization for Tok2VecListener * use DEFAULT_UPSTREAM field * revert to avoid circular imports * Fix tests * Replace deprecated arg * Make model dirs require config * fix pickling of keyword-only arguments in constructor * WIP: clean up and integrate full config * Add helper to handle function args more reliably Now also includes keyword-only args * Fix config composition and serialization * Improve config debugging and add visual diff * Remove unused defaults and fix type * Remove pipeline and factories from meta * Update spacy/default_config.cfg Co-authored-by: Sofie Van Landeghem <svlandeg@users.noreply.github.com> * Update spacy/default_config.cfg * small UX edits * avoid printing stack trace for debug CLI commands * Add support for language-specific factories * specify the section of the config which holds the model to debug * WIP: add Language.from_config * Update with language data refactor WIP * Auto-format * Add backwards-compat handling for Language.factories * Update morphologizer.pyx * Fix morphologizer * Update and simplify lemmatizers * Fix Japanese tests * Port over tagger changes * Fix Chinese and tests * Update to latest Thinc * WIP: xfail first Russian lemmatizer test * Fix component-specific overrides * fix nO for output layers in debug_model * Fix default value * Fix tests and don't pass objects in config * Fix deep merging * Fix lemma lookup data registry Only load the lookups if an entry is available in the registry (and if spacy-lookups-data is installed) * Add types * Add Vocab.from_config * Fix typo * Fix tests * Make config copying more elegant * Fix pipe analysis * Fix lemmatizers and is_base_form * WIP: move language defaults to config * Fix morphology type * Fix vocab * Remove comment * Update to latest Thinc * Add morph rules to config * Tidy up * Remove set_morphology option from tagger factory * Hack use_gpu * Move [pipeline] to top-level block and make [nlp.pipeline] list Allows separating component blocks from component order – otherwise, ordering the config would mean a changed component order, which is bad. Also allows initial config to define more components and not use all of them * Fix use_gpu and resume in CLI * Auto-format * Remove resume from config * Fix formatting and error * [pipeline] -> [components] * Fix types * Fix tagger test: requires set_morphology? Co-authored-by: Sofie Van Landeghem <svlandeg@users.noreply.github.com> Co-authored-by: svlandeg <sofie.vanlandeghem@gmail.com> Co-authored-by: Matthew Honnibal <honnibal+gh@gmail.com>
1591 lines
65 KiB
Python
1591 lines
65 KiB
Python
from typing import Optional, Any, Dict, Callable, Iterable, Union, List, Pattern
|
||
from typing import Tuple, Iterator
|
||
from dataclasses import dataclass
|
||
import random
|
||
import itertools
|
||
import weakref
|
||
import functools
|
||
from collections import Iterable as IterableInstance
|
||
from contextlib import contextmanager
|
||
from copy import copy, deepcopy
|
||
from pathlib import Path
|
||
import warnings
|
||
from thinc.api import get_current_ops, Config, require_gpu, Optimizer
|
||
import srsly
|
||
import multiprocessing as mp
|
||
from itertools import chain, cycle
|
||
|
||
from .tokens.underscore import Underscore
|
||
from .vocab import Vocab
|
||
from .pipe_analysis import analyze_pipes, analyze_all_pipes, validate_attrs
|
||
from .gold import Example
|
||
from .scorer import Scorer
|
||
from .util import link_vectors_to_models, create_default_optimizer, registry
|
||
from .util import SimpleFrozenDict
|
||
from .lang.punctuation import TOKENIZER_PREFIXES, TOKENIZER_SUFFIXES
|
||
from .lang.punctuation import TOKENIZER_INFIXES
|
||
from .lang.tokenizer_exceptions import TOKEN_MATCH, URL_MATCH
|
||
from .lang.tag_map import TAG_MAP
|
||
from .tokens import Doc, Span
|
||
from .errors import Errors, Warnings
|
||
from .schemas import ConfigSchema
|
||
from .git_info import GIT_VERSION
|
||
from . import util
|
||
from . import about
|
||
|
||
# We also need to import these to make sure the functions are registered
|
||
from .tokenizer import Tokenizer # noqa: F401
|
||
from .lemmatizer import Lemmatizer # noqa: F401
|
||
from .lookups import Lookups # noqa: F401
|
||
|
||
|
||
ENABLE_PIPELINE_ANALYSIS = False
|
||
# This is the base config will all settings (training etc.)
|
||
DEFAULT_CONFIG_PATH = Path(__file__).parent / "default_config.cfg"
|
||
DEFAULT_CONFIG = Config().from_disk(DEFAULT_CONFIG_PATH)
|
||
|
||
|
||
class BaseDefaults:
|
||
token_match: Optional[Pattern] = TOKEN_MATCH
|
||
url_match: Pattern = URL_MATCH
|
||
prefixes: Tuple[Pattern, ...] = tuple(TOKENIZER_PREFIXES)
|
||
suffixes: Tuple[Pattern, ...] = tuple(TOKENIZER_SUFFIXES)
|
||
infixes: Tuple[Pattern, ...] = tuple(TOKENIZER_INFIXES)
|
||
tag_map: Dict[str, dict] = dict(TAG_MAP)
|
||
tokenizer_exceptions: Dict[str, List[dict]] = {}
|
||
morph_rules: Dict[str, Dict[str, dict]] = {}
|
||
syntax_iterators: Dict[str, Callable[[Union[Doc, Span]], Iterator]] = {}
|
||
single_orth_variants: List[Dict[str, List[str]]] = []
|
||
paired_orth_variants: List[Dict[str, Union[List[str], List[Tuple[str, str]]]]] = []
|
||
|
||
|
||
class Language:
|
||
"""A text-processing pipeline. Usually you'll load this once per process,
|
||
and pass the instance around your application.
|
||
|
||
Defaults (class): Settings, data and factory methods for creating the `nlp`
|
||
object and processing pipeline.
|
||
lang (str): Two-letter language ID, i.e. ISO code.
|
||
|
||
DOCS: https://spacy.io/api/language
|
||
"""
|
||
|
||
Defaults = BaseDefaults
|
||
lang: str = None
|
||
default_config = DEFAULT_CONFIG
|
||
factories = SimpleFrozenDict(error=Errors.E957)
|
||
|
||
_factory_meta: Dict[str, "FactoryMeta"] = {} # meta by factory
|
||
|
||
def __init__(
|
||
self,
|
||
vocab: Union[Vocab, bool] = True,
|
||
max_length: int = 10 ** 6,
|
||
meta: Dict[str, Any] = {},
|
||
create_tokenizer: Optional[Callable[["Language"], Callable[[str], Doc]]] = None,
|
||
**kwargs,
|
||
):
|
||
"""Initialise a Language object.
|
||
|
||
vocab (Vocab): A `Vocab` object. If `True`, a vocab is created.
|
||
meta (dict): Custom meta data for the Language class. Is written to by
|
||
models to add model meta data.
|
||
max_length (int) :
|
||
Maximum number of characters in a single text. The current models
|
||
may run out memory on extremely long texts, due to large internal
|
||
allocations. You should segment these texts into meaningful units,
|
||
e.g. paragraphs, subsections etc, before passing them to spaCy.
|
||
Default maximum length is 1,000,000 characters (1mb). As a rule of
|
||
thumb, if all pipeline components are enabled, spaCy's default
|
||
models currently requires roughly 1GB of temporary memory per
|
||
100,000 characters in one text.
|
||
RETURNS (Language): The newly constructed object.
|
||
"""
|
||
# We're only calling this to import all factories provided via entry
|
||
# points. The factory decorator applied to these functions takes care
|
||
# of the rest.
|
||
util.registry._entry_point_factories.get_all()
|
||
|
||
self._config = util.deep_merge_configs(self.default_config, DEFAULT_CONFIG)
|
||
self._meta = dict(meta)
|
||
self._path = None
|
||
self._optimizer = None
|
||
# Component meta and configs are only needed on the instance
|
||
self._pipe_meta: Dict[str, "FactoryMeta"] = {} # meta by component
|
||
self._pipe_configs: Dict[str, Config] = {} # config by component
|
||
|
||
if vocab is True:
|
||
vectors_name = meta.get("vectors", {}).get("name")
|
||
vocab = Vocab.from_config(
|
||
self._config,
|
||
vectors_name=vectors_name,
|
||
# TODO: what should we do with these?
|
||
tag_map=self.Defaults.tag_map,
|
||
morph_rules=self.Defaults.morph_rules,
|
||
)
|
||
else:
|
||
if (self.lang and vocab.lang) and (self.lang != vocab.lang):
|
||
raise ValueError(Errors.E150.format(nlp=self.lang, vocab=vocab.lang))
|
||
self.vocab = vocab
|
||
if self.lang is None:
|
||
self.lang = self.vocab.lang
|
||
self.pipeline = []
|
||
self.max_length = max_length
|
||
self.resolved = {}
|
||
# Create the default tokenizer from the default config
|
||
if not create_tokenizer:
|
||
tokenizer_cfg = {"tokenizer": self._config["nlp"]["tokenizer"]}
|
||
create_tokenizer = registry.make_from_config(tokenizer_cfg)["tokenizer"]
|
||
self.tokenizer = create_tokenizer(self)
|
||
|
||
def __init_subclass__(cls, **kwargs):
|
||
super().__init_subclass__(**kwargs)
|
||
cls.default_config = util.deep_merge_configs(cls.default_config, DEFAULT_CONFIG)
|
||
|
||
@property
|
||
def path(self):
|
||
return self._path
|
||
|
||
@property
|
||
def meta(self) -> Dict[str, Any]:
|
||
spacy_version = util.get_model_version_range(about.__version__)
|
||
if self.vocab.lang:
|
||
self._meta.setdefault("lang", self.vocab.lang)
|
||
else:
|
||
self._meta.setdefault("lang", self.lang)
|
||
self._meta.setdefault("name", "model")
|
||
self._meta.setdefault("version", "0.0.0")
|
||
self._meta.setdefault("spacy_version", spacy_version)
|
||
self._meta.setdefault("description", "")
|
||
self._meta.setdefault("author", "")
|
||
self._meta.setdefault("email", "")
|
||
self._meta.setdefault("url", "")
|
||
self._meta.setdefault("license", "")
|
||
self._meta.setdefault("spacy_git_version", GIT_VERSION)
|
||
self._meta["vectors"] = {
|
||
"width": self.vocab.vectors_length,
|
||
"vectors": len(self.vocab.vectors),
|
||
"keys": self.vocab.vectors.n_keys,
|
||
"name": self.vocab.vectors.name,
|
||
}
|
||
self._meta["labels"] = self.pipe_labels
|
||
return self._meta
|
||
|
||
@meta.setter
|
||
def meta(self, value: Dict[str, Any]) -> None:
|
||
self._meta = value
|
||
|
||
@property
|
||
def config(self) -> Config:
|
||
self._config.setdefault("nlp", {})
|
||
self._config["nlp"]["lang"] = self.lang
|
||
# We're storing the filled config for each pipeline component and so
|
||
# we can populate the config again later
|
||
pipeline = {}
|
||
for pipe_name in self.pipe_names:
|
||
pipe_meta = self.get_pipe_meta(pipe_name)
|
||
pipe_config = self.get_pipe_config(pipe_name)
|
||
pipeline[pipe_name] = {"@factories": pipe_meta.factory, **pipe_config}
|
||
self._config["nlp"]["pipeline"] = self.pipe_names
|
||
self._config["components"] = pipeline
|
||
if not srsly.is_json_serializable(self._config):
|
||
raise ValueError(Errors.E961.format(config=self._config))
|
||
return self._config
|
||
|
||
@config.setter
|
||
def config(self, value: Config) -> None:
|
||
self._config = value
|
||
|
||
@property
|
||
def factory_names(self) -> List[str]:
|
||
"""Get names of all available factories.
|
||
|
||
RETURNS (List[str]): The factory names.
|
||
"""
|
||
return list(self.factories.keys())
|
||
|
||
@property
|
||
def pipe_names(self) -> List[str]:
|
||
"""Get names of available pipeline components.
|
||
|
||
RETURNS (List[str]): List of component name strings, in order.
|
||
"""
|
||
return [pipe_name for pipe_name, _ in self.pipeline]
|
||
|
||
@property
|
||
def pipe_factories(self) -> Dict[str, str]:
|
||
"""Get the component factories for the available pipeline components.
|
||
|
||
RETURNS (Dict[str, str]): Factory names, keyed by component names.
|
||
"""
|
||
factories = {}
|
||
for pipe_name, pipe in self.pipeline:
|
||
factories[pipe_name] = self.get_pipe_meta(pipe_name).factory
|
||
return factories
|
||
|
||
@property
|
||
def pipe_labels(self) -> Dict[str, List[str]]:
|
||
"""Get the labels set by the pipeline components, if available (if
|
||
the component exposes a labels property).
|
||
|
||
RETURNS (Dict[str, List[str]]): Labels keyed by component name.
|
||
"""
|
||
labels = {}
|
||
for name, pipe in self.pipeline:
|
||
if hasattr(pipe, "labels"):
|
||
labels[name] = list(pipe.labels)
|
||
return labels
|
||
|
||
@classmethod
|
||
def has_factory(cls, name: str) -> bool:
|
||
"""RETURNS (bool): Whether a factory of that name is registered."""
|
||
internal_name = cls.get_factory_name(name)
|
||
return name in registry.factories or internal_name in registry.factories
|
||
|
||
@classmethod
|
||
def get_factory_name(cls, name: str) -> str:
|
||
"""Get the internal factory name based on the language subclass.
|
||
|
||
name (str): The factory name.
|
||
RETURNS (str): The internal factory name.
|
||
"""
|
||
if cls.lang is None:
|
||
return name
|
||
return f"{cls.lang}.{name}"
|
||
|
||
@classmethod
|
||
def get_factory_meta(cls, name: str) -> "FactoryMeta":
|
||
"""Get the meta information for a given factory name.
|
||
|
||
name (str): The component factory name.
|
||
RETURNS (FactoryMeta): The meta for the given factory name.
|
||
"""
|
||
internal_name = cls.get_factory_name(name)
|
||
if internal_name in cls._factory_meta:
|
||
return cls._factory_meta[internal_name]
|
||
if name in cls._factory_meta:
|
||
return cls._factory_meta[name]
|
||
raise ValueError(Errors.E967.format(meta="factory", name=name))
|
||
|
||
@classmethod
|
||
def set_factory_meta(cls, name: str, value: "FactoryMeta") -> None:
|
||
"""Set the meta information for a given factory name.
|
||
|
||
name (str): The component factory name.
|
||
value (FactoryMeta): The meta to set.
|
||
"""
|
||
cls._factory_meta[cls.get_factory_name(name)] = value
|
||
|
||
def get_pipe_meta(self, name: str) -> "FactoryMeta":
|
||
"""Get the meta information for a given component name.
|
||
|
||
name (str): The component name.
|
||
RETURNS (FactoryMeta): The meta for the given component name.
|
||
"""
|
||
if name not in self._pipe_meta:
|
||
raise ValueError(Errors.E967.format(meta="component", name=name))
|
||
return self._pipe_meta[name]
|
||
|
||
def get_pipe_config(self, name: str) -> Config:
|
||
"""Get the config used to create a pipeline component.
|
||
|
||
name (str): The component name.
|
||
RETURNS (Config): The config used to create the pipeline component.
|
||
"""
|
||
if name not in self._pipe_configs:
|
||
raise ValueError(Errors.E960.format(name=name))
|
||
pipe_config = self._pipe_configs[name]
|
||
pipe_config.pop("nlp", None)
|
||
pipe_config.pop("name", None)
|
||
return pipe_config
|
||
|
||
@classmethod
|
||
def factory(
|
||
cls,
|
||
name: str,
|
||
*,
|
||
default_config: Dict[str, Any] = SimpleFrozenDict(),
|
||
assigns: Iterable[str] = tuple(),
|
||
requires: Iterable[str] = tuple(),
|
||
retokenizes: bool = False,
|
||
func: Optional[Callable] = None,
|
||
) -> Callable:
|
||
"""Register a new pipeline component factory. Can be used as a decorator
|
||
on a function or classmethod, or called as a function with the factory
|
||
provided as the func keyword argument. To create a component and add
|
||
it to the pipeline, you can use nlp.add_pipe(name).
|
||
|
||
name (str): The name of the component factory.
|
||
default_config (Dict[str, Any]): Default configuration, describing the
|
||
default values of the factory arguments.
|
||
assigns (Iterable[str]): Doc/Token attributes assigned by this component,
|
||
e.g. "token.ent_id". Used for pipeline analyis.
|
||
requires (Iterable[str]): Doc/Token attributes required by this component,
|
||
e.g. "token.ent_id". Used for pipeline analyis.
|
||
retokenizes (bool): Whether the component changes the tokenization.
|
||
Used for pipeline analysis.
|
||
func (Optional[Callable]): Factory function if not used as a decorator.
|
||
"""
|
||
if not isinstance(name, str):
|
||
raise ValueError(Errors.E963.format(decorator="factory"))
|
||
if not isinstance(default_config, dict):
|
||
err = Errors.E962.format(
|
||
style="default config", name=name, cfg_type=type(default_config)
|
||
)
|
||
raise ValueError(err)
|
||
internal_name = cls.get_factory_name(name)
|
||
if internal_name in registry.factories:
|
||
# We only check for the internal name here – it's okay if it's a
|
||
# subclass and the base class has a factory of the same name
|
||
raise ValueError(Errors.E004.format(name=name))
|
||
|
||
def add_factory(factory_func: Callable) -> Callable:
|
||
arg_names = util.get_arg_names(factory_func)
|
||
if "nlp" not in arg_names or "name" not in arg_names:
|
||
raise ValueError(Errors.E964.format(name=name))
|
||
# Officially register the factory so we can later call
|
||
# registry.make_from_config and refer to it in the config as
|
||
# @factories = "spacy.Language.xyz". We use the class name here so
|
||
# different classes can have different factories.
|
||
registry.factories.register(internal_name, func=factory_func)
|
||
factory_meta = FactoryMeta(
|
||
factory=name,
|
||
default_config=default_config,
|
||
assigns=validate_attrs(assigns),
|
||
requires=validate_attrs(requires),
|
||
retokenizes=retokenizes,
|
||
)
|
||
cls.set_factory_meta(name, factory_meta)
|
||
# We're overwriting the class attr with a frozen dict to handle
|
||
# backwards-compat (writing to Language.factories directly). This
|
||
# wouldn't work with an instance property and just produce a
|
||
# confusing error – here we can show a custom error
|
||
cls.factories = SimpleFrozenDict(
|
||
registry.factories.get_all(), error=Errors.E957
|
||
)
|
||
return factory_func
|
||
|
||
if func is not None: # Support non-decorator use cases
|
||
return add_factory(func)
|
||
return add_factory
|
||
|
||
@classmethod
|
||
def component(
|
||
cls,
|
||
name: Optional[str] = None,
|
||
*,
|
||
assigns: Iterable[str] = tuple(),
|
||
requires: Iterable[str] = tuple(),
|
||
retokenizes: bool = False,
|
||
func: Optional[Callable[[Doc], Doc]] = None,
|
||
) -> Callable:
|
||
"""Register a new pipeline component. Can be used for stateless function
|
||
components that don't require a separate factory. Can be used as a
|
||
decorator on a function or classmethod, or called as a function with the
|
||
factory provided as the func keyword argument. To create a component and
|
||
add it to the pipeline, you can use nlp.add_pipe(name).
|
||
|
||
name (str): The name of the component factory.
|
||
assigns (Iterable[str]): Doc/Token attributes assigned by this component,
|
||
e.g. "token.ent_id". Used for pipeline analyis.
|
||
requires (Iterable[str]): Doc/Token attributes required by this component,
|
||
e.g. "token.ent_id". Used for pipeline analyis.
|
||
retokenizes (bool): Whether the component changes the tokenization.
|
||
Used for pipeline analysis.
|
||
func (Optional[Callable]): Factory function if not used as a decorator.
|
||
"""
|
||
if name is not None and not isinstance(name, str):
|
||
raise ValueError(Errors.E963.format(decorator="component"))
|
||
component_name = name if name is not None else util.get_object_name(func)
|
||
|
||
def add_component(component_func: Callable[[Doc], Doc]) -> Callable:
|
||
if isinstance(func, type): # function is a class
|
||
raise ValueError(Errors.E965.format(name=component_name))
|
||
|
||
def factory_func(nlp: cls, name: str) -> Callable[[Doc], Doc]:
|
||
return component_func
|
||
|
||
cls.factory(
|
||
component_name,
|
||
assigns=assigns,
|
||
requires=requires,
|
||
retokenizes=retokenizes,
|
||
func=factory_func,
|
||
)
|
||
return component_func
|
||
|
||
if func is not None: # Support non-decorator use cases
|
||
return add_component(func)
|
||
return add_component
|
||
|
||
def get_pipe(self, name: str) -> Callable[[Doc], Doc]:
|
||
"""Get a pipeline component for a given component name.
|
||
|
||
name (str): Name of pipeline component to get.
|
||
RETURNS (callable): The pipeline component.
|
||
|
||
DOCS: https://spacy.io/api/language#get_pipe
|
||
"""
|
||
for pipe_name, component in self.pipeline:
|
||
if pipe_name == name:
|
||
return component
|
||
raise KeyError(Errors.E001.format(name=name, opts=self.pipe_names))
|
||
|
||
def create_pipe(
|
||
self,
|
||
factory_name: str,
|
||
name: Optional[str] = None,
|
||
config: Optional[Dict[str, Any]] = SimpleFrozenDict(),
|
||
overrides: Optional[Dict[str, Any]] = SimpleFrozenDict(),
|
||
validate: bool = True,
|
||
) -> Callable[[Doc], Doc]:
|
||
"""Create a pipeline component. Mostly used internally. To create and
|
||
add a component to the pipeline, you can use nlp.add_pipe.
|
||
|
||
factory_name (str): Name of component factory.
|
||
name (Optional[str]): Optional name to assign to component instance.
|
||
Defaults to factory name if not set.
|
||
config (Optional[Dict[str, Any]]): Config parameters to use for this
|
||
component. Will be merged with default config, if available.
|
||
overrides (Optional[Dict[str, Any]]): Config overrides, typically
|
||
passed in via the CLI.
|
||
validate (bool): Whether to validate the component config against the
|
||
arguments and types expected by the factory.
|
||
RETURNS (Callable[[Doc], Doc]): The pipeline component.
|
||
"""
|
||
name = name if name is not None else factory_name
|
||
if not isinstance(config, dict):
|
||
err = Errors.E962.format(style="config", name=name, cfg_type=type(config))
|
||
raise ValueError(err)
|
||
if not srsly.is_json_serializable(config):
|
||
raise ValueError(Errors.E961.format(config=config))
|
||
if not self.has_factory(factory_name):
|
||
err = Errors.E002.format(
|
||
name=factory_name,
|
||
opts=", ".join(self.factory_names),
|
||
method="create_pipe",
|
||
lang=util.get_object_name(self),
|
||
lang_code=self.lang,
|
||
)
|
||
raise ValueError(err)
|
||
pipe_meta = self.get_factory_meta(factory_name)
|
||
config = config or {}
|
||
# This is unideal, but the alternative would mean you always need to
|
||
# specify the full config settings, which is not really viable.
|
||
if pipe_meta.default_config:
|
||
config = util.deep_merge_configs(config, pipe_meta.default_config)
|
||
# We need to create a top-level key because Thinc doesn't allow resolving
|
||
# top-level references to registered functions. Also gives nicer errors.
|
||
# The name allows components to know their pipe name and use it in the
|
||
# losses etc. (even if multiple instances of the same factory are used)
|
||
internal_name = self.get_factory_name(factory_name)
|
||
# If the language-specific factory doesn't exist, try again with the
|
||
# not-specific name
|
||
if internal_name not in registry.factories:
|
||
internal_name = factory_name
|
||
config = {"nlp": self, "name": name, **config, "@factories": internal_name}
|
||
cfg = {factory_name: config}
|
||
# We're calling the internal _fill here to avoid constructing the
|
||
# registered functions twice
|
||
# TODO: customize validation to make it more readable / relate it to
|
||
# pipeline component and why it failed, explain default config
|
||
resolved, filled = registry.resolve(cfg, validate=validate, overrides=overrides)
|
||
filled = filled[factory_name]
|
||
filled["@factories"] = factory_name
|
||
self._pipe_configs[name] = filled
|
||
return resolved[factory_name]
|
||
|
||
def add_pipe(
|
||
self,
|
||
factory_name: str,
|
||
name: Optional[str] = None,
|
||
*,
|
||
before: Optional[Union[str, int]] = None,
|
||
after: Optional[Union[str, int]] = None,
|
||
first: Optional[bool] = None,
|
||
last: Optional[bool] = None,
|
||
config: Optional[Dict[str, Any]] = SimpleFrozenDict(),
|
||
overrides: Optional[Dict[str, Any]] = SimpleFrozenDict(),
|
||
validate: bool = True,
|
||
) -> Callable[[Doc], Doc]:
|
||
"""Add a component to the processing pipeline. Valid components are
|
||
callables that take a `Doc` object, modify it and return it. Only one
|
||
of before/after/first/last can be set. Default behaviour is "last".
|
||
|
||
factory_name (str): Name of the component factory.
|
||
name (str): Name of pipeline component. Overwrites existing
|
||
component.name attribute if available. If no name is set and
|
||
the component exposes no name attribute, component.__name__ is
|
||
used. An error is raised if a name already exists in the pipeline.
|
||
before (Union[str, int]): Name or index of the component to insert new
|
||
component directly before.
|
||
after (Union[str, int]): Name or index of the component to insert new
|
||
component directly after.
|
||
first (bool): If True, insert component first in the pipeline.
|
||
last (bool): If True, insert component last in the pipeline.
|
||
config (Optional[Dict[str, Any]]): Config parameters to use for this
|
||
component. Will be merged with default config, if available.
|
||
overrides (Optional[Dict[str, Any]]): Config overrides, typically
|
||
passed in via the CLI.
|
||
validate (bool): Whether to validate the component config against the
|
||
arguments and types expected by the factory.
|
||
RETURNS (Callable[[Doc], Doc]): The pipeline component.
|
||
|
||
DOCS: https://spacy.io/api/language#add_pipe
|
||
"""
|
||
if not isinstance(factory_name, str):
|
||
bad_val = repr(factory_name)
|
||
err = Errors.E966.format(component=bad_val, name=name)
|
||
raise ValueError(err)
|
||
if not self.has_factory(factory_name):
|
||
err = Errors.E002.format(
|
||
name=factory_name,
|
||
opts=", ".join(self.factory_names),
|
||
method="add_pipe",
|
||
lang=util.get_object_name(self),
|
||
lang_code=self.lang,
|
||
)
|
||
name = name if name is not None else factory_name
|
||
if name in self.pipe_names:
|
||
raise ValueError(Errors.E007.format(name=name, opts=self.pipe_names))
|
||
pipe_component = self.create_pipe(
|
||
factory_name,
|
||
name=name,
|
||
config=config,
|
||
overrides=overrides,
|
||
validate=validate,
|
||
)
|
||
pipe_index = self._get_pipe_index(before, after, first, last)
|
||
self._pipe_meta[name] = self.get_factory_meta(factory_name)
|
||
self.pipeline.insert(pipe_index, (name, pipe_component))
|
||
if ENABLE_PIPELINE_ANALYSIS:
|
||
analyze_pipes(self, name, pipe_index)
|
||
return pipe_component
|
||
|
||
def _get_pipe_index(
|
||
self,
|
||
before: Optional[Union[str, int]] = None,
|
||
after: Optional[Union[str, int]] = None,
|
||
first: Optional[bool] = None,
|
||
last: Optional[bool] = None,
|
||
) -> int:
|
||
"""Determine where to insert a pipeline component based on the before/
|
||
after/first/last values.
|
||
|
||
before (str): Name or index of the component to insert directly before.
|
||
after (str): Name or index of component to insert directly after.
|
||
first (bool): If True, insert component first in the pipeline.
|
||
last (bool): If True, insert component last in the pipeline.
|
||
RETURNS (int): The index of the new pipeline component.
|
||
"""
|
||
all_args = {"before": before, "after": after, "first": first, "last": last}
|
||
if sum(arg is not None for arg in [before, after, first, last]) >= 2:
|
||
raise ValueError(Errors.E006.format(args=all_args, opts=self.pipe_names))
|
||
if last or not any(value is not None for value in [first, before, after]):
|
||
return len(self.pipeline)
|
||
elif first:
|
||
return 0
|
||
elif isinstance(before, str):
|
||
if before not in self.pipe_names:
|
||
raise ValueError(Errors.E001.format(name=before, opts=self.pipe_names))
|
||
return self.pipe_names.index(before)
|
||
elif isinstance(after, str):
|
||
if after not in self.pipe_names:
|
||
raise ValueError(Errors.E001.format(name=after, opts=self.pipe_names))
|
||
return self.pipe_names.index(after) + 1
|
||
# We're only accepting indices referring to components that exist
|
||
# (can't just do isinstance here because bools are instance of int, too)
|
||
elif type(before) == int:
|
||
if before >= len(self.pipeline) or before < 0:
|
||
err = Errors.E959.format(dir="before", idx=before, opts=self.pipe_names)
|
||
raise ValueError(err)
|
||
return before
|
||
elif type(after) == int:
|
||
if after >= len(self.pipeline) or after < 0:
|
||
err = Errors.E959.format(dir="after", idx=after, opts=self.pipe_names)
|
||
raise ValueError(err)
|
||
return after + 1
|
||
raise ValueError(Errors.E006.format(args=all_args, opts=self.pipe_names))
|
||
|
||
def has_pipe(self, name: str) -> bool:
|
||
"""Check if a component name is present in the pipeline. Equivalent to
|
||
`name in nlp.pipe_names`.
|
||
|
||
name (str): Name of the component.
|
||
RETURNS (bool): Whether a component of the name exists in the pipeline.
|
||
|
||
DOCS: https://spacy.io/api/language#has_pipe
|
||
"""
|
||
return name in self.pipe_names
|
||
|
||
def replace_pipe(
|
||
self,
|
||
name: str,
|
||
factory_name: str,
|
||
config: Dict[str, Any] = SimpleFrozenDict(),
|
||
validate: bool = True,
|
||
) -> None:
|
||
"""Replace a component in the pipeline.
|
||
|
||
name (str): Name of the component to replace.
|
||
factory_name (str): Factory name of replacement component.
|
||
config (Optional[Dict[str, Any]]): Config parameters to use for this
|
||
component. Will be merged with default config, if available.
|
||
validate (bool): Whether to validate the component config against the
|
||
arguments and types expected by the factory.
|
||
|
||
DOCS: https://spacy.io/api/language#replace_pipe
|
||
"""
|
||
if name not in self.pipe_names:
|
||
raise ValueError(Errors.E001.format(name=name, opts=self.pipe_names))
|
||
if hasattr(factory_name, "__call__"):
|
||
err = Errors.E968.format(component=repr(factory_name), name=name)
|
||
raise ValueError(err)
|
||
# We need to delegate to Language.add_pipe here instead of just writing
|
||
# to Language.pipeline to make sure the configs are handled correctly
|
||
pipe_index = self.pipe_names.index(name)
|
||
self.remove_pipe(name)
|
||
if not len(self.pipeline): # we have no components to insert before/after
|
||
self.add_pipe(factory_name, name=name)
|
||
else:
|
||
self.add_pipe(factory_name, name=name, before=pipe_index)
|
||
if ENABLE_PIPELINE_ANALYSIS:
|
||
analyze_all_pipes(self)
|
||
|
||
def rename_pipe(self, old_name: str, new_name: str) -> None:
|
||
"""Rename a pipeline component.
|
||
|
||
old_name (str): Name of the component to rename.
|
||
new_name (str): New name of the component.
|
||
|
||
DOCS: https://spacy.io/api/language#rename_pipe
|
||
"""
|
||
if old_name not in self.pipe_names:
|
||
raise ValueError(Errors.E001.format(name=old_name, opts=self.pipe_names))
|
||
if new_name in self.pipe_names:
|
||
raise ValueError(Errors.E007.format(name=new_name, opts=self.pipe_names))
|
||
i = self.pipe_names.index(old_name)
|
||
self.pipeline[i] = (new_name, self.pipeline[i][1])
|
||
self._pipe_meta[new_name] = self._pipe_meta.pop(old_name)
|
||
self._pipe_configs[new_name] = self._pipe_configs.pop(old_name)
|
||
|
||
def remove_pipe(self, name: str) -> Tuple[str, Callable[[Doc], Doc]]:
|
||
"""Remove a component from the pipeline.
|
||
|
||
name (str): Name of the component to remove.
|
||
RETURNS (tuple): A `(name, component)` tuple of the removed component.
|
||
|
||
DOCS: https://spacy.io/api/language#remove_pipe
|
||
"""
|
||
if name not in self.pipe_names:
|
||
raise ValueError(Errors.E001.format(name=name, opts=self.pipe_names))
|
||
removed = self.pipeline.pop(self.pipe_names.index(name))
|
||
# We're only removing the component itself from the metas/configs here
|
||
# because factory may be used for something else
|
||
self._pipe_meta.pop(name)
|
||
self._pipe_configs.pop(name)
|
||
if ENABLE_PIPELINE_ANALYSIS:
|
||
analyze_all_pipes(self)
|
||
return removed
|
||
|
||
def __call__(
|
||
self,
|
||
text: str,
|
||
disable: Iterable[str] = tuple(),
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
) -> Doc:
|
||
"""Apply the pipeline to some text. The text can span multiple sentences,
|
||
and can contain arbitrary whitespace. Alignment into the original string
|
||
is preserved.
|
||
|
||
text (str): The text to be processed.
|
||
disable (list): Names of the pipeline components to disable.
|
||
component_cfg (dict): An optional dictionary with extra keyword arguments
|
||
for specific components.
|
||
RETURNS (Doc): A container for accessing the annotations.
|
||
|
||
DOCS: https://spacy.io/api/language#call
|
||
"""
|
||
if len(text) > self.max_length:
|
||
raise ValueError(
|
||
Errors.E088.format(length=len(text), max_length=self.max_length)
|
||
)
|
||
doc = self.make_doc(text)
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
for name, proc in self.pipeline:
|
||
if name in disable:
|
||
continue
|
||
if not hasattr(proc, "__call__"):
|
||
raise ValueError(Errors.E003.format(component=type(proc), name=name))
|
||
try:
|
||
doc = proc(doc, **component_cfg.get(name, {}))
|
||
except KeyError:
|
||
raise ValueError(Errors.E109.format(name=name))
|
||
if doc is None:
|
||
raise ValueError(Errors.E005.format(name=name))
|
||
return doc
|
||
|
||
def disable_pipes(self, *names) -> "DisabledPipes":
|
||
"""Disable one or more pipeline components. If used as a context
|
||
manager, the pipeline will be restored to the initial state at the end
|
||
of the block. Otherwise, a DisabledPipes object is returned, that has
|
||
a `.restore()` method you can use to undo your changes.
|
||
|
||
This method has been deprecated since 3.0
|
||
"""
|
||
warnings.warn(Warnings.W096, DeprecationWarning)
|
||
if len(names) == 1 and isinstance(names[0], (list, tuple)):
|
||
names = names[0] # support list of names instead of spread
|
||
return DisabledPipes(self, names)
|
||
|
||
def select_pipes(
|
||
self,
|
||
disable: Optional[Union[str, Iterable[str]]] = None,
|
||
enable: Optional[Union[str, Iterable[str]]] = None,
|
||
) -> "DisabledPipes":
|
||
"""Disable one or more pipeline components. If used as a context
|
||
manager, the pipeline will be restored to the initial state at the end
|
||
of the block. Otherwise, a DisabledPipes object is returned, that has
|
||
a `.restore()` method you can use to undo your changes.
|
||
|
||
disable (str or iterable): The name(s) of the pipes to disable
|
||
enable (str or iterable): The name(s) of the pipes to enable - all others will be disabled
|
||
|
||
DOCS: https://spacy.io/api/language#select_pipes
|
||
"""
|
||
if enable is None and disable is None:
|
||
raise ValueError(Errors.E991)
|
||
if disable is not None and isinstance(disable, str):
|
||
disable = [disable]
|
||
if enable is not None:
|
||
if isinstance(enable, str):
|
||
enable = [enable]
|
||
to_disable = [pipe for pipe in self.pipe_names if pipe not in enable]
|
||
# raise an error if the enable and disable keywords are not consistent
|
||
if disable is not None and disable != to_disable:
|
||
raise ValueError(
|
||
Errors.E992.format(
|
||
enable=enable, disable=disable, names=self.pipe_names
|
||
)
|
||
)
|
||
disable = to_disable
|
||
return DisabledPipes(self, disable)
|
||
|
||
def make_doc(self, text: str) -> Doc:
|
||
"""Turn a text into a Doc object.
|
||
|
||
text (str): The text to process.
|
||
RETURNS (Doc): The processed doc.
|
||
"""
|
||
return self.tokenizer(text)
|
||
|
||
def update(
|
||
self,
|
||
examples: Iterable[Example],
|
||
dummy: Optional[Any] = None,
|
||
*,
|
||
drop: float = 0.0,
|
||
sgd: Optional[Optimizer] = None,
|
||
losses: Optional[Dict[str, float]] = None,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
):
|
||
"""Update the models in the pipeline.
|
||
|
||
examples (Iterable[Example]): A batch of examples
|
||
dummy: Should not be set - serves to catch backwards-incompatible scripts.
|
||
drop (float): The dropout rate.
|
||
sgd (Optimizer): An optimizer.
|
||
losses (Dict[str, float]): Dictionary to update with the loss, keyed by component.
|
||
component_cfg (Dict[str, Dict]): Config parameters for specific pipeline
|
||
components, keyed by component name.
|
||
RETURNS (Dict[str, float]): The updated losses dictionary
|
||
|
||
DOCS: https://spacy.io/api/language#update
|
||
"""
|
||
if dummy is not None:
|
||
raise ValueError(Errors.E989)
|
||
if losses is None:
|
||
losses = {}
|
||
if len(examples) == 0:
|
||
return losses
|
||
if not isinstance(examples, IterableInstance):
|
||
raise TypeError(
|
||
Errors.E978.format(
|
||
name="language", method="update", types=type(examples)
|
||
)
|
||
)
|
||
wrong_types = set([type(eg) for eg in examples if not isinstance(eg, Example)])
|
||
if wrong_types:
|
||
raise TypeError(
|
||
Errors.E978.format(name="language", method="update", types=wrong_types)
|
||
)
|
||
|
||
if sgd is None:
|
||
if self._optimizer is None:
|
||
self._optimizer = create_default_optimizer()
|
||
sgd = self._optimizer
|
||
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
for i, (name, proc) in enumerate(self.pipeline):
|
||
component_cfg.setdefault(name, {})
|
||
component_cfg[name].setdefault("drop", drop)
|
||
component_cfg[name].setdefault("set_annotations", False)
|
||
for name, proc in self.pipeline:
|
||
if not hasattr(proc, "update"):
|
||
continue
|
||
proc.update(examples, sgd=None, losses=losses, **component_cfg[name])
|
||
if sgd not in (None, False):
|
||
for name, proc in self.pipeline:
|
||
if hasattr(proc, "model"):
|
||
proc.model.finish_update(sgd)
|
||
return losses
|
||
|
||
def rehearse(
|
||
self,
|
||
examples: Iterable[Example],
|
||
sgd: Optional[Optimizer] = None,
|
||
losses: Optional[Dict[str, float]] = None,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
) -> Dict[str, float]:
|
||
"""Make a "rehearsal" update to the models in the pipeline, to prevent
|
||
forgetting. Rehearsal updates run an initial copy of the model over some
|
||
data, and update the model so its current predictions are more like the
|
||
initial ones. This is useful for keeping a pretrained model on-track,
|
||
even if you're updating it with a smaller set of examples.
|
||
|
||
examples (Iterable[Example]): A batch of `Example` objects.
|
||
sgd (Optional[Optimizer]): An optimizer.
|
||
component_cfg (Dict[str, Dict]): Config parameters for specific pipeline
|
||
components, keyed by component name.
|
||
RETURNS (dict): Results from the update.
|
||
|
||
EXAMPLE:
|
||
>>> raw_text_batches = minibatch(raw_texts)
|
||
>>> for labelled_batch in minibatch(examples):
|
||
>>> nlp.update(labelled_batch)
|
||
>>> raw_batch = [Example.from_dict(nlp.make_doc(text), {}) for text in next(raw_text_batches)]
|
||
>>> nlp.rehearse(raw_batch)
|
||
"""
|
||
# TODO: document
|
||
if len(examples) == 0:
|
||
return
|
||
if not isinstance(examples, IterableInstance):
|
||
raise TypeError(
|
||
Errors.E978.format(
|
||
name="language", method="rehearse", types=type(examples)
|
||
)
|
||
)
|
||
wrong_types = set([type(eg) for eg in examples if not isinstance(eg, Example)])
|
||
if wrong_types:
|
||
raise TypeError(
|
||
Errors.E978.format(
|
||
name="language", method="rehearse", types=wrong_types
|
||
)
|
||
)
|
||
if sgd is None:
|
||
if self._optimizer is None:
|
||
self._optimizer = create_default_optimizer()
|
||
sgd = self._optimizer
|
||
pipes = list(self.pipeline)
|
||
random.shuffle(pipes)
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
grads = {}
|
||
|
||
def get_grads(W, dW, key=None):
|
||
grads[key] = (W, dW)
|
||
|
||
get_grads.learn_rate = sgd.learn_rate
|
||
get_grads.b1 = sgd.b1
|
||
get_grads.b2 = sgd.b2
|
||
for name, proc in pipes:
|
||
if not hasattr(proc, "rehearse"):
|
||
continue
|
||
grads = {}
|
||
proc.rehearse(
|
||
examples, sgd=get_grads, losses=losses, **component_cfg.get(name, {})
|
||
)
|
||
for key, (W, dW) in grads.items():
|
||
sgd(W, dW, key=key)
|
||
return losses
|
||
|
||
def begin_training(
|
||
self,
|
||
get_examples: Optional[Callable] = None,
|
||
sgd: Optional[Optimizer] = None,
|
||
device: int = -1,
|
||
) -> Optimizer:
|
||
"""Allocate models, pre-process training data and acquire a trainer and
|
||
optimizer. Used as a contextmanager.
|
||
|
||
get_examples (function): Function returning example training data.
|
||
TODO: document format change since 3.0.
|
||
sgd (Optional[Optimizer]): An optimizer.
|
||
RETURNS: An optimizer.
|
||
|
||
DOCS: https://spacy.io/api/language#begin_training
|
||
"""
|
||
# TODO: throw warning when get_gold_tuples is provided instead of get_examples
|
||
if get_examples is None:
|
||
get_examples = lambda: []
|
||
else: # Populate vocab
|
||
for example in get_examples():
|
||
for word in [t.text for t in example.reference]:
|
||
_ = self.vocab[word] # noqa: F841
|
||
if device >= 0: # TODO: do we need this here?
|
||
require_gpu(device)
|
||
if self.vocab.vectors.data.shape[1] >= 1:
|
||
ops = get_current_ops()
|
||
self.vocab.vectors.data = ops.asarray(self.vocab.vectors.data)
|
||
link_vectors_to_models(self.vocab)
|
||
if sgd is None:
|
||
sgd = create_default_optimizer()
|
||
self._optimizer = sgd
|
||
for name, proc in self.pipeline:
|
||
if hasattr(proc, "begin_training"):
|
||
proc.begin_training(
|
||
get_examples, pipeline=self.pipeline, sgd=self._optimizer
|
||
)
|
||
self._link_components()
|
||
return self._optimizer
|
||
|
||
def resume_training(
|
||
self, sgd: Optional[Optimizer] = None, device: int = -1
|
||
) -> Optimizer:
|
||
"""Continue training a pretrained model.
|
||
|
||
Create and return an optimizer, and initialize "rehearsal" for any pipeline
|
||
component that has a .rehearse() method. Rehearsal is used to prevent
|
||
models from "forgetting" their initialised "knowledge". To perform
|
||
rehearsal, collect samples of text you want the models to retain performance
|
||
on, and call nlp.rehearse() with a batch of Example objects.
|
||
|
||
sgd (Optional[Optimizer]): An optimizer.
|
||
RETURNS (Optimizer): The optimizer.
|
||
"""
|
||
if device >= 0: # TODO: do we need this here?
|
||
require_gpu(device)
|
||
ops = get_current_ops()
|
||
if self.vocab.vectors.data.shape[1] >= 1:
|
||
self.vocab.vectors.data = ops.asarray(self.vocab.vectors.data)
|
||
link_vectors_to_models(self.vocab)
|
||
if sgd is None:
|
||
sgd = create_default_optimizer()
|
||
self._optimizer = sgd
|
||
for name, proc in self.pipeline:
|
||
if hasattr(proc, "_rehearsal_model"):
|
||
proc._rehearsal_model = deepcopy(proc.model)
|
||
return self._optimizer
|
||
|
||
def evaluate(
|
||
self,
|
||
examples: Iterable[Example],
|
||
verbose: bool = False,
|
||
batch_size: int = 256,
|
||
scorer: Optional[Scorer] = None,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
) -> Scorer:
|
||
"""Evaluate a model's pipeline components.
|
||
|
||
examples (Iterable[Example]): `Example` objects.
|
||
verbose (bool): Print debugging information.
|
||
batch_size (int): Batch size to use.
|
||
scorer (Optional[Scorer]): Scorer to use. If not passed in, a new one
|
||
will be created.
|
||
component_cfg (dict): An optional dictionary with extra keyword
|
||
arguments for specific components.
|
||
RETURNS (Scorer): The scorer containing the evaluation results.
|
||
|
||
DOCS: https://spacy.io/api/language#evaluate
|
||
"""
|
||
if not isinstance(examples, IterableInstance):
|
||
err = Errors.E978.format(
|
||
name="language", method="evaluate", types=type(examples)
|
||
)
|
||
raise TypeError(err)
|
||
wrong_types = set([type(eg) for eg in examples if not isinstance(eg, Example)])
|
||
if wrong_types:
|
||
err = Errors.E978.format(
|
||
name="language", method="evaluate", types=wrong_types
|
||
)
|
||
raise TypeError(err)
|
||
if scorer is None:
|
||
scorer = Scorer(pipeline=self.pipeline)
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
docs = list(eg.predicted for eg in examples)
|
||
for name, pipe in self.pipeline:
|
||
kwargs = component_cfg.get(name, {})
|
||
kwargs.setdefault("batch_size", batch_size)
|
||
if not hasattr(pipe, "pipe"):
|
||
docs = _pipe(docs, pipe, kwargs)
|
||
else:
|
||
docs = pipe.pipe(docs, **kwargs)
|
||
for i, (doc, eg) in enumerate(zip(docs, examples)):
|
||
if verbose:
|
||
print(doc)
|
||
eg.predicted = doc
|
||
kwargs = component_cfg.get("scorer", {})
|
||
kwargs.setdefault("verbose", verbose)
|
||
scorer.score(eg, **kwargs)
|
||
return scorer
|
||
|
||
@contextmanager
|
||
def use_params(self, params: dict, **cfg):
|
||
"""Replace weights of models in the pipeline with those provided in the
|
||
params dictionary. Can be used as a contextmanager, in which case,
|
||
models go back to their original weights after the block.
|
||
|
||
params (dict): A dictionary of parameters keyed by model ID.
|
||
**cfg: Config parameters.
|
||
|
||
EXAMPLE:
|
||
>>> with nlp.use_params(optimizer.averages):
|
||
>>> nlp.to_disk('/tmp/checkpoint')
|
||
"""
|
||
contexts = [
|
||
pipe.use_params(params)
|
||
for name, pipe in self.pipeline
|
||
if hasattr(pipe, "use_params") and hasattr(pipe, "model")
|
||
]
|
||
# TODO: Having trouble with contextlib
|
||
# Workaround: these aren't actually context managers atm.
|
||
for context in contexts:
|
||
try:
|
||
next(context)
|
||
except StopIteration:
|
||
pass
|
||
yield
|
||
for context in contexts:
|
||
try:
|
||
next(context)
|
||
except StopIteration:
|
||
pass
|
||
|
||
def pipe(
|
||
self,
|
||
texts: Iterable[str],
|
||
as_tuples: bool = False,
|
||
batch_size: int = 1000,
|
||
disable: Iterable[str] = tuple(),
|
||
cleanup: bool = False,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
n_process: int = 1,
|
||
):
|
||
"""Process texts as a stream, and yield `Doc` objects in order.
|
||
|
||
texts (Iterable[str]): A sequence of texts to process.
|
||
as_tuples (bool): If set to True, inputs should be a sequence of
|
||
(text, context) tuples. Output will then be a sequence of
|
||
(doc, context) tuples. Defaults to False.
|
||
batch_size (int): The number of texts to buffer.
|
||
disable (List[str]): Names of the pipeline components to disable.
|
||
cleanup (bool): If True, unneeded strings are freed to control memory
|
||
use. Experimental.
|
||
component_cfg (Dict[str, Dict]): An optional dictionary with extra keyword
|
||
arguments for specific components.
|
||
n_process (int): Number of processors to process texts. If -1, set `multiprocessing.cpu_count()`.
|
||
YIELDS (Doc): Documents in the order of the original text.
|
||
|
||
DOCS: https://spacy.io/api/language#pipe
|
||
"""
|
||
if n_process == -1:
|
||
n_process = mp.cpu_count()
|
||
if as_tuples:
|
||
text_context1, text_context2 = itertools.tee(texts)
|
||
texts = (tc[0] for tc in text_context1)
|
||
contexts = (tc[1] for tc in text_context2)
|
||
docs = self.pipe(
|
||
texts,
|
||
batch_size=batch_size,
|
||
disable=disable,
|
||
n_process=n_process,
|
||
component_cfg=component_cfg,
|
||
)
|
||
for doc, context in zip(docs, contexts):
|
||
yield (doc, context)
|
||
return
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
|
||
pipes = (
|
||
[]
|
||
) # contains functools.partial objects to easily create multiprocess worker.
|
||
for name, proc in self.pipeline:
|
||
if name in disable:
|
||
continue
|
||
kwargs = component_cfg.get(name, {})
|
||
# Allow component_cfg to overwrite the top-level kwargs.
|
||
kwargs.setdefault("batch_size", batch_size)
|
||
if hasattr(proc, "pipe"):
|
||
f = functools.partial(proc.pipe, **kwargs)
|
||
else:
|
||
# Apply the function, but yield the doc
|
||
f = functools.partial(_pipe, proc=proc, kwargs=kwargs)
|
||
pipes.append(f)
|
||
|
||
if n_process != 1:
|
||
docs = self._multiprocessing_pipe(texts, pipes, n_process, batch_size)
|
||
else:
|
||
# if n_process == 1, no processes are forked.
|
||
docs = (self.make_doc(text) for text in texts)
|
||
for pipe in pipes:
|
||
docs = pipe(docs)
|
||
|
||
# Track weakrefs of "recent" documents, so that we can see when they
|
||
# expire from memory. When they do, we know we don't need old strings.
|
||
# This way, we avoid maintaining an unbounded growth in string entries
|
||
# in the string store.
|
||
recent_refs = weakref.WeakSet()
|
||
old_refs = weakref.WeakSet()
|
||
# Keep track of the original string data, so that if we flush old strings,
|
||
# we can recover the original ones. However, we only want to do this if we're
|
||
# really adding strings, to save up-front costs.
|
||
original_strings_data = None
|
||
nr_seen = 0
|
||
for doc in docs:
|
||
yield doc
|
||
if cleanup:
|
||
recent_refs.add(doc)
|
||
if nr_seen < 10000:
|
||
old_refs.add(doc)
|
||
nr_seen += 1
|
||
elif len(old_refs) == 0:
|
||
old_refs, recent_refs = recent_refs, old_refs
|
||
if original_strings_data is None:
|
||
original_strings_data = list(self.vocab.strings)
|
||
else:
|
||
keys, strings = self.vocab.strings._cleanup_stale_strings(
|
||
original_strings_data
|
||
)
|
||
self.vocab._reset_cache(keys, strings)
|
||
self.tokenizer._reset_cache(keys)
|
||
nr_seen = 0
|
||
|
||
def _multiprocessing_pipe(
|
||
self,
|
||
texts: Iterable[str],
|
||
pipes: Iterable[Callable[[Doc], Doc]],
|
||
n_process: int,
|
||
batch_size: int,
|
||
) -> None:
|
||
# raw_texts is used later to stop iteration.
|
||
texts, raw_texts = itertools.tee(texts)
|
||
# for sending texts to worker
|
||
texts_q = [mp.Queue() for _ in range(n_process)]
|
||
# for receiving byte-encoded docs from worker
|
||
bytedocs_recv_ch, bytedocs_send_ch = zip(
|
||
*[mp.Pipe(False) for _ in range(n_process)]
|
||
)
|
||
|
||
batch_texts = util.minibatch(texts, batch_size)
|
||
# Sender sends texts to the workers.
|
||
# This is necessary to properly handle infinite length of texts.
|
||
# (In this case, all data cannot be sent to the workers at once)
|
||
sender = _Sender(batch_texts, texts_q, chunk_size=n_process)
|
||
# send twice to make process busy
|
||
sender.send()
|
||
sender.send()
|
||
|
||
procs = [
|
||
mp.Process(
|
||
target=_apply_pipes,
|
||
args=(self.make_doc, pipes, rch, sch, Underscore.get_state()),
|
||
)
|
||
for rch, sch in zip(texts_q, bytedocs_send_ch)
|
||
]
|
||
for proc in procs:
|
||
proc.start()
|
||
|
||
# Cycle channels not to break the order of docs.
|
||
# The received object is a batch of byte-encoded docs, so flatten them with chain.from_iterable.
|
||
byte_docs = chain.from_iterable(recv.recv() for recv in cycle(bytedocs_recv_ch))
|
||
docs = (Doc(self.vocab).from_bytes(byte_doc) for byte_doc in byte_docs)
|
||
try:
|
||
for i, (_, doc) in enumerate(zip(raw_texts, docs), 1):
|
||
yield doc
|
||
if i % batch_size == 0:
|
||
# tell `sender` that one batch was consumed.
|
||
sender.step()
|
||
finally:
|
||
for proc in procs:
|
||
proc.terminate()
|
||
|
||
def _link_components(self) -> None:
|
||
"""Register 'listeners' within pipeline components, to allow them to
|
||
effectively share weights.
|
||
"""
|
||
for i, (name1, proc1) in enumerate(self.pipeline):
|
||
if hasattr(proc1, "find_listeners"):
|
||
for name2, proc2 in self.pipeline[i:]:
|
||
if hasattr(proc2, "model"):
|
||
proc1.find_listeners(proc2.model)
|
||
|
||
@classmethod
|
||
def from_config(
|
||
cls,
|
||
config: Union[Dict[str, Any], Config] = {},
|
||
disable: Iterable[str] = tuple(),
|
||
overrides: Dict[str, Any] = {},
|
||
auto_fill: bool = True,
|
||
validate: bool = True,
|
||
) -> "Language":
|
||
"""Create the nlp object from a loaded config. Will set up the tokenizer
|
||
and language data, add pipeline components etc. If no config is provided,
|
||
the default config of the given language is used.
|
||
"""
|
||
if auto_fill:
|
||
config = util.deep_merge_configs(config, cls.default_config)
|
||
if "nlp" not in config:
|
||
raise ValueError(Errors.E985.format(config=config))
|
||
nlp_config = config["nlp"]
|
||
config_lang = nlp_config["lang"]
|
||
if cls.lang is not None and config_lang is not None and config_lang != cls.lang:
|
||
raise ValueError(
|
||
Errors.E958.format(
|
||
bad_lang_code=nlp_config["lang"],
|
||
lang_code=cls.lang,
|
||
lang=util.get_object_name(cls),
|
||
)
|
||
)
|
||
nlp_config["lang"] = cls.lang
|
||
# This isn't very elegant, but we remove the [components] block here to prevent
|
||
# it from getting resolved (causes problems because we expect to pass in
|
||
# the nlp and name args for each component). If we're auto-filling, we're
|
||
# using the nlp.config with all defaults.
|
||
config = util.copy_config(config)
|
||
orig_pipeline = config.pop("components", {})
|
||
config["components"] = {}
|
||
non_pipe_overrides, pipe_overrides = _get_config_overrides(overrides)
|
||
resolved, filled = registry.resolve(
|
||
config, validate=validate, schema=ConfigSchema, overrides=non_pipe_overrides
|
||
)
|
||
filled["components"] = orig_pipeline
|
||
config["components"] = orig_pipeline
|
||
create_tokenizer = resolved["nlp"]["tokenizer"]
|
||
lemmatizer = resolved["nlp"]["lemmatizer"]
|
||
lex_attr_getters = resolved["nlp"]["lex_attr_getters"]
|
||
stop_words = resolved["nlp"]["stop_words"]
|
||
vocab = Vocab.from_config(
|
||
filled,
|
||
lemmatizer=lemmatizer,
|
||
lex_attr_getters=lex_attr_getters,
|
||
stop_words=stop_words,
|
||
# TODO: what should we do with these?
|
||
tag_map=cls.Defaults.tag_map,
|
||
morph_rules=cls.Defaults.morph_rules,
|
||
)
|
||
nlp = cls(vocab, create_tokenizer=create_tokenizer)
|
||
pipeline = config.get("components", {})
|
||
for pipe_name in nlp_config["pipeline"]:
|
||
if pipe_name not in pipeline:
|
||
opts = ", ".join(pipeline.keys())
|
||
raise ValueError(Errors.E956.format(name=pipe_name, opts=opts))
|
||
pipe_cfg = pipeline[pipe_name]
|
||
if pipe_name not in disable:
|
||
if "@factories" not in pipe_cfg:
|
||
err = Errors.E984.format(name=pipe_name, config=pipe_cfg)
|
||
raise ValueError(err)
|
||
factory = pipe_cfg["@factories"]
|
||
# The pipe name (key in the config) here is the unique name of the
|
||
# component, not necessarily the factory
|
||
nlp.add_pipe(
|
||
factory,
|
||
name=pipe_name,
|
||
config=pipe_cfg,
|
||
overrides=pipe_overrides,
|
||
validate=validate,
|
||
)
|
||
nlp.config = filled if auto_fill else config
|
||
nlp.resolved = resolved
|
||
return nlp
|
||
|
||
def to_disk(self, path: Union[str, Path], exclude: Iterable[str] = tuple()) -> None:
|
||
"""Save the current state to a directory. If a model is loaded, this
|
||
will include the model.
|
||
|
||
path (str / Path): Path to a directory, which will be created if
|
||
it doesn't exist.
|
||
exclude (list): Names of components or serialization fields to exclude.
|
||
|
||
DOCS: https://spacy.io/api/language#to_disk
|
||
"""
|
||
path = util.ensure_path(path)
|
||
serializers = {}
|
||
serializers["tokenizer"] = lambda p: self.tokenizer.to_disk(
|
||
p, exclude=["vocab"]
|
||
)
|
||
serializers["meta.json"] = lambda p: srsly.write_json(p, self.meta)
|
||
serializers["config.cfg"] = lambda p: self.config.to_disk(p)
|
||
for name, proc in self.pipeline:
|
||
if not hasattr(proc, "name"):
|
||
continue
|
||
if name in exclude:
|
||
continue
|
||
if not hasattr(proc, "to_disk"):
|
||
continue
|
||
serializers[name] = lambda p, proc=proc: proc.to_disk(p, exclude=["vocab"])
|
||
serializers["vocab"] = lambda p: self.vocab.to_disk(p)
|
||
util.to_disk(path, serializers, exclude)
|
||
|
||
def from_disk(
|
||
self, path: Union[str, Path], exclude: Iterable[str] = tuple()
|
||
) -> "Language":
|
||
"""Loads state from a directory. Modifies the object in place and
|
||
returns it. If the saved `Language` object contains a model, the
|
||
model will be loaded.
|
||
|
||
path (str / Path): A path to a directory.
|
||
exclude (list): Names of components or serialization fields to exclude.
|
||
RETURNS (Language): The modified `Language` object.
|
||
|
||
DOCS: https://spacy.io/api/language#from_disk
|
||
"""
|
||
|
||
def deserialize_meta(path: Path) -> None:
|
||
if path.exists():
|
||
data = srsly.read_json(path)
|
||
self.meta.update(data)
|
||
# self.meta always overrides meta["vectors"] with the metadata
|
||
# from self.vocab.vectors, so set the name directly
|
||
self.vocab.vectors.name = data.get("vectors", {}).get("name")
|
||
|
||
def deserialize_vocab(path: Path) -> None:
|
||
if path.exists():
|
||
self.vocab.from_disk(path)
|
||
_fix_pretrained_vectors_name(self)
|
||
|
||
path = util.ensure_path(path)
|
||
|
||
deserializers = {}
|
||
if Path(path / "config.cfg").exists():
|
||
deserializers["config.cfg"] = lambda p: self.config.from_disk(p)
|
||
deserializers["meta.json"] = deserialize_meta
|
||
deserializers["vocab"] = deserialize_vocab
|
||
deserializers["tokenizer"] = lambda p: self.tokenizer.from_disk(
|
||
p, exclude=["vocab"]
|
||
)
|
||
for name, proc in self.pipeline:
|
||
if name in exclude:
|
||
continue
|
||
if not hasattr(proc, "from_disk"):
|
||
continue
|
||
deserializers[name] = lambda p, proc=proc: proc.from_disk(
|
||
p, exclude=["vocab"]
|
||
)
|
||
if not (path / "vocab").exists() and "vocab" not in exclude:
|
||
# Convert to list here in case exclude is (default) tuple
|
||
exclude = list(exclude) + ["vocab"]
|
||
util.from_disk(path, deserializers, exclude)
|
||
self._path = path
|
||
self._link_components()
|
||
return self
|
||
|
||
def to_bytes(self, exclude: Iterable[str] = tuple()) -> bytes:
|
||
"""Serialize the current state to a binary string.
|
||
|
||
exclude (list): Names of components or serialization fields to exclude.
|
||
RETURNS (bytes): The serialized form of the `Language` object.
|
||
|
||
DOCS: https://spacy.io/api/language#to_bytes
|
||
"""
|
||
serializers = {}
|
||
serializers["vocab"] = lambda: self.vocab.to_bytes()
|
||
serializers["tokenizer"] = lambda: self.tokenizer.to_bytes(exclude=["vocab"])
|
||
serializers["meta.json"] = lambda: srsly.json_dumps(self.meta)
|
||
serializers["config.cfg"] = lambda: self.config.to_bytes()
|
||
for name, proc in self.pipeline:
|
||
if name in exclude:
|
||
continue
|
||
if not hasattr(proc, "to_bytes"):
|
||
continue
|
||
serializers[name] = lambda proc=proc: proc.to_bytes(exclude=["vocab"])
|
||
return util.to_bytes(serializers, exclude)
|
||
|
||
def from_bytes(
|
||
self, bytes_data: bytes, exclude: Iterable[str] = tuple()
|
||
) -> "Language":
|
||
"""Load state from a binary string.
|
||
|
||
bytes_data (bytes): The data to load from.
|
||
exclude (list): Names of components or serialization fields to exclude.
|
||
RETURNS (Language): The `Language` object.
|
||
|
||
DOCS: https://spacy.io/api/language#from_bytes
|
||
"""
|
||
|
||
def deserialize_meta(b):
|
||
data = srsly.json_loads(b)
|
||
self.meta.update(data)
|
||
# self.meta always overrides meta["vectors"] with the metadata
|
||
# from self.vocab.vectors, so set the name directly
|
||
self.vocab.vectors.name = data.get("vectors", {}).get("name")
|
||
|
||
def deserialize_vocab(b):
|
||
self.vocab.from_bytes(b)
|
||
_fix_pretrained_vectors_name(self)
|
||
|
||
deserializers = {}
|
||
deserializers["config.cfg"] = lambda b: self.config.from_bytes(b)
|
||
deserializers["meta.json"] = deserialize_meta
|
||
deserializers["vocab"] = deserialize_vocab
|
||
deserializers["tokenizer"] = lambda b: self.tokenizer.from_bytes(
|
||
b, exclude=["vocab"]
|
||
)
|
||
for name, proc in self.pipeline:
|
||
if name in exclude:
|
||
continue
|
||
if not hasattr(proc, "from_bytes"):
|
||
continue
|
||
deserializers[name] = lambda b, proc=proc: proc.from_bytes(
|
||
b, exclude=["vocab"]
|
||
)
|
||
util.from_bytes(bytes_data, deserializers, exclude)
|
||
self._link_components()
|
||
return self
|
||
|
||
|
||
@dataclass
|
||
class FactoryMeta:
|
||
factory: str
|
||
default_config: Optional[Dict[str, Any]] = None # noqa: E704
|
||
assigns: Iterable[str] = tuple()
|
||
requires: Iterable[str] = tuple()
|
||
retokenizes: bool = False
|
||
|
||
|
||
def _get_config_overrides(
|
||
items: Dict[str, Any], prefix: str = "components"
|
||
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
||
prefix = f"{prefix}."
|
||
non_pipe = {k: v for k, v in items.items() if not k.startswith(prefix)}
|
||
pipe = {k.replace(prefix, ""): v for k, v in items.items() if k.startswith(prefix)}
|
||
return non_pipe, pipe
|
||
|
||
|
||
def _fix_pretrained_vectors_name(nlp: Language) -> None:
|
||
# TODO: Replace this once we handle vectors consistently as static
|
||
# data
|
||
if "vectors" in nlp.meta and "name" in nlp.meta["vectors"]:
|
||
nlp.vocab.vectors.name = nlp.meta["vectors"]["name"]
|
||
elif not nlp.vocab.vectors.size:
|
||
nlp.vocab.vectors.name = None
|
||
elif "name" in nlp.meta and "lang" in nlp.meta:
|
||
vectors_name = f"{nlp.meta['lang']}_{nlp.meta['name']}.vectors"
|
||
nlp.vocab.vectors.name = vectors_name
|
||
else:
|
||
raise ValueError(Errors.E092)
|
||
if nlp.vocab.vectors.size != 0:
|
||
link_vectors_to_models(nlp.vocab)
|
||
for name, proc in nlp.pipeline:
|
||
if not hasattr(proc, "cfg"):
|
||
continue
|
||
proc.cfg.setdefault("deprecation_fixes", {})
|
||
proc.cfg["deprecation_fixes"]["vectors_name"] = nlp.vocab.vectors.name
|
||
|
||
|
||
class DisabledPipes(list):
|
||
"""Manager for temporary pipeline disabling."""
|
||
|
||
def __init__(self, nlp: Language, names: List[str]):
|
||
self.nlp = nlp
|
||
self.names = names
|
||
# Important! Not deep copy -- we just want the container (but we also
|
||
# want to support people providing arbitrarily typed nlp.pipeline
|
||
# objects.)
|
||
self.original_pipeline = copy(nlp.pipeline)
|
||
self.metas = {name: nlp.get_pipe_meta(name) for name in names}
|
||
self.configs = {name: nlp.get_pipe_config(name) for name in names}
|
||
list.__init__(self)
|
||
self.extend(nlp.remove_pipe(name) for name in names)
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, *args):
|
||
self.restore()
|
||
|
||
def restore(self) -> None:
|
||
"""Restore the pipeline to its state when DisabledPipes was created."""
|
||
current, self.nlp.pipeline = self.nlp.pipeline, self.original_pipeline
|
||
unexpected = [name for name, pipe in current if not self.nlp.has_pipe(name)]
|
||
if unexpected:
|
||
# Don't change the pipeline if we're raising an error.
|
||
self.nlp.pipeline = current
|
||
raise ValueError(Errors.E008.format(names=unexpected))
|
||
self.nlp._pipe_meta.update(self.metas)
|
||
self.nlp._pipe_configs.update(self.configs)
|
||
self[:] = []
|
||
|
||
|
||
def _pipe(
|
||
examples: Iterable[Example], proc: Callable[[Doc], Doc], kwargs: Dict[str, Any]
|
||
) -> Iterator[Example]:
|
||
# We added some args for pipe that __call__ doesn't expect.
|
||
kwargs = dict(kwargs)
|
||
for arg in ["batch_size"]:
|
||
if arg in kwargs:
|
||
kwargs.pop(arg)
|
||
for eg in examples:
|
||
eg = proc(eg, **kwargs)
|
||
yield eg
|
||
|
||
|
||
def _apply_pipes(
|
||
make_doc: Callable[[str], Doc],
|
||
pipes: Iterable[Callable[[Doc], Doc]],
|
||
receiver,
|
||
sender,
|
||
underscore_state: Tuple[dict, dict, dict],
|
||
) -> None:
|
||
"""Worker for Language.pipe
|
||
|
||
make_doc (Callable[[str,] Doc]): Function to create Doc from text.
|
||
pipes (Iterable[Callable[[Doc], Doc]]): The components to apply.
|
||
receiver (multiprocessing.Connection): Pipe to receive text. Usually
|
||
created by `multiprocessing.Pipe()`
|
||
sender (multiprocessing.Connection): Pipe to send doc. Usually created by
|
||
`multiprocessing.Pipe()`
|
||
underscore_state (Tuple[dict, dict, dict]): The data in the Underscore class
|
||
of the parent.
|
||
"""
|
||
Underscore.load_state(underscore_state)
|
||
while True:
|
||
texts = receiver.get()
|
||
docs = (make_doc(text) for text in texts)
|
||
for pipe in pipes:
|
||
docs = pipe(docs)
|
||
# Connection does not accept unpickable objects, so send list.
|
||
sender.send([doc.to_bytes() for doc in docs])
|
||
|
||
|
||
class _Sender:
|
||
"""Util for sending data to multiprocessing workers in Language.pipe"""
|
||
|
||
def __init__(
|
||
self, data: Iterable[Any], queues: List[mp.Queue], chunk_size: int
|
||
) -> None:
|
||
self.data = iter(data)
|
||
self.queues = iter(cycle(queues))
|
||
self.chunk_size = chunk_size
|
||
self.count = 0
|
||
|
||
def send(self) -> None:
|
||
"""Send chunk_size items from self.data to channels."""
|
||
for item, q in itertools.islice(
|
||
zip(self.data, cycle(self.queues)), self.chunk_size
|
||
):
|
||
# cycle channels so that distribute the texts evenly
|
||
q.put(item)
|
||
|
||
def step(self) -> None:
|
||
"""Tell sender that comsumed one item. Data is sent to the workers after
|
||
every chunk_size calls.
|
||
"""
|
||
self.count += 1
|
||
if self.count >= self.chunk_size:
|
||
self.count = 0
|
||
self.send()
|