mirror of
https://github.com/explosion/spaCy.git
synced 2024-11-16 06:37:04 +03:00
c053f158c5
* Add support for fasttext-bloom hash-only vectors Overview: * Extend `Vectors` to have two modes: `default` and `ngram` * `default` is the default mode and equivalent to the current `Vectors` * `ngram` supports the hash-only ngram tables from `fasttext-bloom` * Extend `spacy.StaticVectors.v2` to handle both modes with no changes for `default` vectors * Extend `spacy init vectors` to support ngram tables The `ngram` mode **only** supports vector tables produced by this fork of fastText, which adds an option to represent all vectors using only the ngram buckets table and which uses the exact same ngram generation algorithm and hash function (`MurmurHash3_x64_128`). `fasttext-bloom` produces an additional `.hashvec` table, which can be loaded by `spacy init vectors --fasttext-bloom-vectors`. https://github.com/adrianeboyd/fastText/tree/feature/bloom Implementation details: * `Vectors` now includes the `StringStore` as `Vectors.strings` so that the API can stay consistent for both `default` (which can look up from `str` or `int`) and `ngram` (which requires `str` to calculate the ngrams). * In ngram mode `Vectors` uses a default `Vectors` object as a cache since the ngram vectors lookups are relatively expensive. * The default cache size is the same size as the provided ngram vector table. * Once the cache is full, no more entries are added. The user is responsible for managing the cache in cases where the initial documents are not representative of the texts. * The cache can be resized by setting `Vectors.ngram_cache_size` or cleared with `vectors._ngram_cache.clear()`. * The API ends up a bit split between methods for `default` and for `ngram`, so functions that only make sense for `default` or `ngram` include warnings with custom messages suggesting alternatives where possible. * `Vocab.vectors` becomes a property so that the string stores can be synced when assigning vectors to a vocab. * `Vectors` serializes its own config settings as `vectors.cfg`. * The `Vectors` serialization methods have added support for `exclude` so that the `Vocab` can exclude the `Vectors` strings while serializing. Removed: * The `minn` and `maxn` options and related code from `Vocab.get_vector`, which does not work in a meaningful way for default vector tables. * The unused `GlobalRegistry` in `Vectors`. * Refactor to use reduce_mean Refactor to use reduce_mean and remove the ngram vectors cache. * Rename to floret * Rename to floret in error messages * Use --vectors-mode in CLI, vector init * Fix vectors mode in init * Remove unused var * Minor API and docstrings adjustments * Rename `--vectors-mode` to `--mode` in `init vectors` CLI * Rename `Vectors.get_floret_vectors` to `Vectors.get_batch` and support both modes. * Minor updates to Vectors docstrings. * Update API docs for Vectors and init vectors CLI * Update types for StaticVectors
2215 lines
95 KiB
Python
2215 lines
95 KiB
Python
from typing import Iterator, Optional, Any, Dict, Callable, Iterable
|
||
from typing import Union, Tuple, List, Set, Pattern, Sequence
|
||
from typing import NoReturn, TYPE_CHECKING, TypeVar, cast, overload
|
||
|
||
from dataclasses import dataclass
|
||
import random
|
||
import itertools
|
||
import functools
|
||
from contextlib import contextmanager
|
||
from copy import deepcopy
|
||
from pathlib import Path
|
||
import warnings
|
||
from thinc.api import get_current_ops, Config, CupyOps, Optimizer
|
||
import srsly
|
||
import multiprocessing as mp
|
||
from itertools import chain, cycle
|
||
from timeit import default_timer as timer
|
||
import traceback
|
||
|
||
from . import ty
|
||
from .tokens.underscore import Underscore
|
||
from .vocab import Vocab, create_vocab
|
||
from .pipe_analysis import validate_attrs, analyze_pipes, print_pipe_analysis
|
||
from .training import Example, validate_examples
|
||
from .training.initialize import init_vocab, init_tok2vec
|
||
from .scorer import Scorer
|
||
from .util import registry, SimpleFrozenList, _pipe, raise_error
|
||
from .util import SimpleFrozenDict, combine_score_weights, CONFIG_SECTION_ORDER
|
||
from .util import warn_if_jupyter_cupy
|
||
from .lang.tokenizer_exceptions import URL_MATCH, BASE_EXCEPTIONS
|
||
from .lang.punctuation import TOKENIZER_PREFIXES, TOKENIZER_SUFFIXES
|
||
from .lang.punctuation import TOKENIZER_INFIXES
|
||
from .tokens import Doc
|
||
from .tokenizer import Tokenizer
|
||
from .errors import Errors, Warnings
|
||
from .schemas import ConfigSchema, ConfigSchemaNlp, ConfigSchemaInit
|
||
from .schemas import ConfigSchemaPretrain, validate_init_settings
|
||
from .git_info import GIT_VERSION
|
||
from . import util
|
||
from . import about
|
||
from .lookups import load_lookups
|
||
from .compat import Literal
|
||
|
||
|
||
if TYPE_CHECKING:
|
||
from .pipeline import Pipe # noqa: F401
|
||
|
||
|
||
# This is the base config will all settings (training etc.)
|
||
DEFAULT_CONFIG_PATH = Path(__file__).parent / "default_config.cfg"
|
||
DEFAULT_CONFIG = util.load_config(DEFAULT_CONFIG_PATH)
|
||
# This is the base config for the [pretraining] block and currently not included
|
||
# in the main config and only added via the 'init fill-config' command
|
||
DEFAULT_CONFIG_PRETRAIN_PATH = Path(__file__).parent / "default_config_pretraining.cfg"
|
||
|
||
# Type variable for contexts piped with documents
|
||
_AnyContext = TypeVar("_AnyContext")
|
||
|
||
|
||
class BaseDefaults:
|
||
"""Language data defaults, available via Language.Defaults. Can be
|
||
overwritten by language subclasses by defining their own subclasses of
|
||
Language.Defaults.
|
||
"""
|
||
|
||
config: Config = Config(section_order=CONFIG_SECTION_ORDER)
|
||
tokenizer_exceptions: Dict[str, List[dict]] = BASE_EXCEPTIONS
|
||
prefixes: Optional[Sequence[Union[str, Pattern]]] = TOKENIZER_PREFIXES
|
||
suffixes: Optional[Sequence[Union[str, Pattern]]] = TOKENIZER_SUFFIXES
|
||
infixes: Optional[Sequence[Union[str, Pattern]]] = TOKENIZER_INFIXES
|
||
token_match: Optional[Callable] = None
|
||
url_match: Optional[Callable] = URL_MATCH
|
||
syntax_iterators: Dict[str, Callable] = {}
|
||
lex_attr_getters: Dict[int, Callable[[str], Any]] = {}
|
||
stop_words: Set[str] = set()
|
||
writing_system = {"direction": "ltr", "has_case": True, "has_letters": True}
|
||
|
||
|
||
@registry.tokenizers("spacy.Tokenizer.v1")
|
||
def create_tokenizer() -> Callable[["Language"], Tokenizer]:
|
||
"""Registered function to create a tokenizer. Returns a factory that takes
|
||
the nlp object and returns a Tokenizer instance using the language detaults.
|
||
"""
|
||
|
||
def tokenizer_factory(nlp: "Language") -> Tokenizer:
|
||
prefixes = nlp.Defaults.prefixes
|
||
suffixes = nlp.Defaults.suffixes
|
||
infixes = nlp.Defaults.infixes
|
||
prefix_search = util.compile_prefix_regex(prefixes).search if prefixes else None
|
||
suffix_search = util.compile_suffix_regex(suffixes).search if suffixes else None
|
||
infix_finditer = util.compile_infix_regex(infixes).finditer if infixes else None
|
||
return Tokenizer(
|
||
nlp.vocab,
|
||
rules=nlp.Defaults.tokenizer_exceptions,
|
||
prefix_search=prefix_search,
|
||
suffix_search=suffix_search,
|
||
infix_finditer=infix_finditer,
|
||
token_match=nlp.Defaults.token_match,
|
||
url_match=nlp.Defaults.url_match,
|
||
)
|
||
|
||
return tokenizer_factory
|
||
|
||
|
||
@registry.misc("spacy.LookupsDataLoader.v1")
|
||
def load_lookups_data(lang, tables):
|
||
util.logger.debug(f"Loading lookups from spacy-lookups-data: {tables}")
|
||
lookups = load_lookups(lang=lang, tables=tables)
|
||
return lookups
|
||
|
||
|
||
class Language:
|
||
"""A text-processing pipeline. Usually you'll load this once per process,
|
||
and pass the instance around your application.
|
||
|
||
Defaults (class): Settings, data and factory methods for creating the `nlp`
|
||
object and processing pipeline.
|
||
lang (str): IETF language code, such as 'en'.
|
||
|
||
DOCS: https://spacy.io/api/language
|
||
"""
|
||
|
||
Defaults = BaseDefaults
|
||
lang: Optional[str] = None
|
||
default_config = DEFAULT_CONFIG
|
||
|
||
factories = SimpleFrozenDict(error=Errors.E957)
|
||
_factory_meta: Dict[str, "FactoryMeta"] = {} # meta by factory
|
||
|
||
def __init__(
|
||
self,
|
||
vocab: Union[Vocab, bool] = True,
|
||
*,
|
||
max_length: int = 10 ** 6,
|
||
meta: Dict[str, Any] = {},
|
||
create_tokenizer: Optional[Callable[["Language"], Callable[[str], Doc]]] = None,
|
||
batch_size: int = 1000,
|
||
**kwargs,
|
||
) -> None:
|
||
"""Initialise a Language object.
|
||
|
||
vocab (Vocab): A `Vocab` object. If `True`, a vocab is created.
|
||
meta (dict): Custom meta data for the Language class. Is written to by
|
||
models to add model meta data.
|
||
max_length (int): Maximum number of characters in a single text. The
|
||
current models may run out memory on extremely long texts, due to
|
||
large internal allocations. You should segment these texts into
|
||
meaningful units, e.g. paragraphs, subsections etc, before passing
|
||
them to spaCy. Default maximum length is 1,000,000 charas (1mb). As
|
||
a rule of thumb, if all pipeline components are enabled, spaCy's
|
||
default models currently requires roughly 1GB of temporary memory per
|
||
100,000 characters in one text.
|
||
create_tokenizer (Callable): Function that takes the nlp object and
|
||
returns a tokenizer.
|
||
batch_size (int): Default batch size for pipe and evaluate.
|
||
|
||
DOCS: https://spacy.io/api/language#init
|
||
"""
|
||
# We're only calling this to import all factories provided via entry
|
||
# points. The factory decorator applied to these functions takes care
|
||
# of the rest.
|
||
util.registry._entry_point_factories.get_all()
|
||
|
||
self._config = DEFAULT_CONFIG.merge(self.default_config)
|
||
self._meta = dict(meta)
|
||
self._path = None
|
||
self._optimizer: Optional[Optimizer] = None
|
||
# Component meta and configs are only needed on the instance
|
||
self._pipe_meta: Dict[str, "FactoryMeta"] = {} # meta by component
|
||
self._pipe_configs: Dict[str, Config] = {} # config by component
|
||
|
||
if not isinstance(vocab, Vocab) and vocab is not True:
|
||
raise ValueError(Errors.E918.format(vocab=vocab, vocab_type=type(Vocab)))
|
||
if vocab is True:
|
||
vectors_name = meta.get("vectors", {}).get("name")
|
||
vocab = create_vocab(self.lang, self.Defaults, vectors_name=vectors_name)
|
||
else:
|
||
if (self.lang and vocab.lang) and (self.lang != vocab.lang):
|
||
raise ValueError(Errors.E150.format(nlp=self.lang, vocab=vocab.lang))
|
||
self.vocab: Vocab = vocab
|
||
if self.lang is None:
|
||
self.lang = self.vocab.lang
|
||
self._components: List[Tuple[str, "Pipe"]] = []
|
||
self._disabled: Set[str] = set()
|
||
self.max_length = max_length
|
||
# Create the default tokenizer from the default config
|
||
if not create_tokenizer:
|
||
tokenizer_cfg = {"tokenizer": self._config["nlp"]["tokenizer"]}
|
||
create_tokenizer = registry.resolve(tokenizer_cfg)["tokenizer"]
|
||
self.tokenizer = create_tokenizer(self)
|
||
self.batch_size = batch_size
|
||
self.default_error_handler = raise_error
|
||
|
||
def __init_subclass__(cls, **kwargs):
|
||
super().__init_subclass__(**kwargs)
|
||
cls.default_config = DEFAULT_CONFIG.merge(cls.Defaults.config)
|
||
cls.default_config["nlp"]["lang"] = cls.lang
|
||
|
||
@property
|
||
def path(self):
|
||
return self._path
|
||
|
||
@property
|
||
def meta(self) -> Dict[str, Any]:
|
||
"""Custom meta data of the language class. If a model is loaded, this
|
||
includes details from the model's meta.json.
|
||
|
||
RETURNS (Dict[str, Any]): The meta.
|
||
|
||
DOCS: https://spacy.io/api/language#meta
|
||
"""
|
||
spacy_version = util.get_minor_version_range(about.__version__)
|
||
if self.vocab.lang:
|
||
self._meta.setdefault("lang", self.vocab.lang)
|
||
else:
|
||
self._meta.setdefault("lang", self.lang)
|
||
self._meta.setdefault("name", "pipeline")
|
||
self._meta.setdefault("version", "0.0.0")
|
||
self._meta.setdefault("spacy_version", spacy_version)
|
||
self._meta.setdefault("description", "")
|
||
self._meta.setdefault("author", "")
|
||
self._meta.setdefault("email", "")
|
||
self._meta.setdefault("url", "")
|
||
self._meta.setdefault("license", "")
|
||
self._meta.setdefault("spacy_git_version", GIT_VERSION)
|
||
self._meta["vectors"] = {
|
||
"width": self.vocab.vectors_length,
|
||
"vectors": len(self.vocab.vectors),
|
||
"keys": self.vocab.vectors.n_keys,
|
||
"name": self.vocab.vectors.name,
|
||
"mode": self.vocab.vectors.mode,
|
||
}
|
||
self._meta["labels"] = dict(self.pipe_labels)
|
||
# TODO: Adding this back to prevent breaking people's code etc., but
|
||
# we should consider removing it
|
||
self._meta["pipeline"] = list(self.pipe_names)
|
||
self._meta["components"] = list(self.component_names)
|
||
self._meta["disabled"] = list(self.disabled)
|
||
return self._meta
|
||
|
||
@meta.setter
|
||
def meta(self, value: Dict[str, Any]) -> None:
|
||
self._meta = value
|
||
|
||
@property
|
||
def config(self) -> Config:
|
||
"""Trainable config for the current language instance. Includes the
|
||
current pipeline components, as well as default training config.
|
||
|
||
RETURNS (thinc.api.Config): The config.
|
||
|
||
DOCS: https://spacy.io/api/language#config
|
||
"""
|
||
self._config.setdefault("nlp", {})
|
||
self._config.setdefault("training", {})
|
||
self._config["nlp"]["lang"] = self.lang
|
||
# We're storing the filled config for each pipeline component and so
|
||
# we can populate the config again later
|
||
pipeline = {}
|
||
score_weights = []
|
||
for pipe_name in self.component_names:
|
||
pipe_meta = self.get_pipe_meta(pipe_name)
|
||
pipe_config = self.get_pipe_config(pipe_name)
|
||
pipeline[pipe_name] = {"factory": pipe_meta.factory, **pipe_config}
|
||
if pipe_meta.default_score_weights:
|
||
score_weights.append(pipe_meta.default_score_weights)
|
||
self._config["nlp"]["pipeline"] = list(self.component_names)
|
||
self._config["nlp"]["disabled"] = list(self.disabled)
|
||
self._config["components"] = pipeline
|
||
# We're merging the existing score weights back into the combined
|
||
# weights to make sure we're preserving custom settings in the config
|
||
# but also reflect updates (e.g. new components added)
|
||
prev_weights = self._config["training"].get("score_weights", {})
|
||
combined_score_weights = combine_score_weights(score_weights, prev_weights)
|
||
self._config["training"]["score_weights"] = combined_score_weights
|
||
if not srsly.is_json_serializable(self._config):
|
||
raise ValueError(Errors.E961.format(config=self._config))
|
||
return self._config
|
||
|
||
@config.setter
|
||
def config(self, value: Config) -> None:
|
||
self._config = value
|
||
|
||
@property
|
||
def disabled(self) -> List[str]:
|
||
"""Get the names of all disabled components.
|
||
|
||
RETURNS (List[str]): The disabled components.
|
||
"""
|
||
# Make sure the disabled components are returned in the order they
|
||
# appear in the pipeline (which isn't guaranteed by the set)
|
||
names = [name for name, _ in self._components if name in self._disabled]
|
||
return SimpleFrozenList(names, error=Errors.E926.format(attr="disabled"))
|
||
|
||
@property
|
||
def factory_names(self) -> List[str]:
|
||
"""Get names of all available factories.
|
||
|
||
RETURNS (List[str]): The factory names.
|
||
"""
|
||
names = list(self.factories.keys())
|
||
return SimpleFrozenList(names)
|
||
|
||
@property
|
||
def components(self) -> List[Tuple[str, "Pipe"]]:
|
||
"""Get all (name, component) tuples in the pipeline, including the
|
||
currently disabled components.
|
||
"""
|
||
return SimpleFrozenList(
|
||
self._components, error=Errors.E926.format(attr="components")
|
||
)
|
||
|
||
@property
|
||
def component_names(self) -> List[str]:
|
||
"""Get the names of the available pipeline components. Includes all
|
||
active and inactive pipeline components.
|
||
|
||
RETURNS (List[str]): List of component name strings, in order.
|
||
"""
|
||
names = [pipe_name for pipe_name, _ in self._components]
|
||
return SimpleFrozenList(names, error=Errors.E926.format(attr="component_names"))
|
||
|
||
@property
|
||
def pipeline(self) -> List[Tuple[str, "Pipe"]]:
|
||
"""The processing pipeline consisting of (name, component) tuples. The
|
||
components are called on the Doc in order as it passes through the
|
||
pipeline.
|
||
|
||
RETURNS (List[Tuple[str, Pipe]]): The pipeline.
|
||
"""
|
||
pipes = [(n, p) for n, p in self._components if n not in self._disabled]
|
||
return SimpleFrozenList(pipes, error=Errors.E926.format(attr="pipeline"))
|
||
|
||
@property
|
||
def pipe_names(self) -> List[str]:
|
||
"""Get names of available active pipeline components.
|
||
|
||
RETURNS (List[str]): List of component name strings, in order.
|
||
"""
|
||
names = [pipe_name for pipe_name, _ in self.pipeline]
|
||
return SimpleFrozenList(names, error=Errors.E926.format(attr="pipe_names"))
|
||
|
||
@property
|
||
def pipe_factories(self) -> Dict[str, str]:
|
||
"""Get the component factories for the available pipeline components.
|
||
|
||
RETURNS (Dict[str, str]): Factory names, keyed by component names.
|
||
"""
|
||
factories = {}
|
||
for pipe_name, pipe in self._components:
|
||
factories[pipe_name] = self.get_pipe_meta(pipe_name).factory
|
||
return SimpleFrozenDict(factories)
|
||
|
||
@property
|
||
def pipe_labels(self) -> Dict[str, List[str]]:
|
||
"""Get the labels set by the pipeline components, if available (if
|
||
the component exposes a labels property).
|
||
|
||
RETURNS (Dict[str, List[str]]): Labels keyed by component name.
|
||
"""
|
||
labels = {}
|
||
for name, pipe in self._components:
|
||
if hasattr(pipe, "labels"):
|
||
labels[name] = list(pipe.labels)
|
||
return SimpleFrozenDict(labels)
|
||
|
||
@classmethod
|
||
def has_factory(cls, name: str) -> bool:
|
||
"""RETURNS (bool): Whether a factory of that name is registered."""
|
||
internal_name = cls.get_factory_name(name)
|
||
return name in registry.factories or internal_name in registry.factories
|
||
|
||
@classmethod
|
||
def get_factory_name(cls, name: str) -> str:
|
||
"""Get the internal factory name based on the language subclass.
|
||
|
||
name (str): The factory name.
|
||
RETURNS (str): The internal factory name.
|
||
"""
|
||
if cls.lang is None:
|
||
return name
|
||
return f"{cls.lang}.{name}"
|
||
|
||
@classmethod
|
||
def get_factory_meta(cls, name: str) -> "FactoryMeta":
|
||
"""Get the meta information for a given factory name.
|
||
|
||
name (str): The component factory name.
|
||
RETURNS (FactoryMeta): The meta for the given factory name.
|
||
"""
|
||
internal_name = cls.get_factory_name(name)
|
||
if internal_name in cls._factory_meta:
|
||
return cls._factory_meta[internal_name]
|
||
if name in cls._factory_meta:
|
||
return cls._factory_meta[name]
|
||
raise ValueError(Errors.E967.format(meta="factory", name=name))
|
||
|
||
@classmethod
|
||
def set_factory_meta(cls, name: str, value: "FactoryMeta") -> None:
|
||
"""Set the meta information for a given factory name.
|
||
|
||
name (str): The component factory name.
|
||
value (FactoryMeta): The meta to set.
|
||
"""
|
||
cls._factory_meta[cls.get_factory_name(name)] = value
|
||
|
||
def get_pipe_meta(self, name: str) -> "FactoryMeta":
|
||
"""Get the meta information for a given component name.
|
||
|
||
name (str): The component name.
|
||
RETURNS (FactoryMeta): The meta for the given component name.
|
||
"""
|
||
if name not in self._pipe_meta:
|
||
raise ValueError(Errors.E967.format(meta="component", name=name))
|
||
return self._pipe_meta[name]
|
||
|
||
def get_pipe_config(self, name: str) -> Config:
|
||
"""Get the config used to create a pipeline component.
|
||
|
||
name (str): The component name.
|
||
RETURNS (Config): The config used to create the pipeline component.
|
||
"""
|
||
if name not in self._pipe_configs:
|
||
raise ValueError(Errors.E960.format(name=name))
|
||
pipe_config = self._pipe_configs[name]
|
||
return pipe_config
|
||
|
||
@classmethod
|
||
def factory(
|
||
cls,
|
||
name: str,
|
||
*,
|
||
default_config: Dict[str, Any] = SimpleFrozenDict(),
|
||
assigns: Iterable[str] = SimpleFrozenList(),
|
||
requires: Iterable[str] = SimpleFrozenList(),
|
||
retokenizes: bool = False,
|
||
default_score_weights: Dict[str, Optional[float]] = SimpleFrozenDict(),
|
||
func: Optional[Callable] = None,
|
||
) -> Callable:
|
||
"""Register a new pipeline component factory. Can be used as a decorator
|
||
on a function or classmethod, or called as a function with the factory
|
||
provided as the func keyword argument. To create a component and add
|
||
it to the pipeline, you can use nlp.add_pipe(name).
|
||
|
||
name (str): The name of the component factory.
|
||
default_config (Dict[str, Any]): Default configuration, describing the
|
||
default values of the factory arguments.
|
||
assigns (Iterable[str]): Doc/Token attributes assigned by this component,
|
||
e.g. "token.ent_id". Used for pipeline analysis.
|
||
requires (Iterable[str]): Doc/Token attributes required by this component,
|
||
e.g. "token.ent_id". Used for pipeline analysis.
|
||
retokenizes (bool): Whether the component changes the tokenization.
|
||
Used for pipeline analysis.
|
||
default_score_weights (Dict[str, Optional[float]]): The scores to report during
|
||
training, and their default weight towards the final score used to
|
||
select the best model. Weights should sum to 1.0 per component and
|
||
will be combined and normalized for the whole pipeline. If None,
|
||
the score won't be shown in the logs or be weighted.
|
||
func (Optional[Callable]): Factory function if not used as a decorator.
|
||
|
||
DOCS: https://spacy.io/api/language#factory
|
||
"""
|
||
if not isinstance(name, str):
|
||
raise ValueError(Errors.E963.format(decorator="factory"))
|
||
if not isinstance(default_config, dict):
|
||
err = Errors.E962.format(
|
||
style="default config", name=name, cfg_type=type(default_config)
|
||
)
|
||
raise ValueError(err)
|
||
|
||
def add_factory(factory_func: Callable) -> Callable:
|
||
internal_name = cls.get_factory_name(name)
|
||
if internal_name in registry.factories:
|
||
# We only check for the internal name here – it's okay if it's a
|
||
# subclass and the base class has a factory of the same name. We
|
||
# also only raise if the function is different to prevent raising
|
||
# if module is reloaded.
|
||
existing_func = registry.factories.get(internal_name)
|
||
if not util.is_same_func(factory_func, existing_func):
|
||
err = Errors.E004.format(
|
||
name=name, func=existing_func, new_func=factory_func
|
||
)
|
||
raise ValueError(err)
|
||
|
||
arg_names = util.get_arg_names(factory_func)
|
||
if "nlp" not in arg_names or "name" not in arg_names:
|
||
raise ValueError(Errors.E964.format(name=name))
|
||
# Officially register the factory so we can later call
|
||
# registry.resolve and refer to it in the config as
|
||
# @factories = "spacy.Language.xyz". We use the class name here so
|
||
# different classes can have different factories.
|
||
registry.factories.register(internal_name, func=factory_func)
|
||
factory_meta = FactoryMeta(
|
||
factory=name,
|
||
default_config=default_config,
|
||
assigns=validate_attrs(assigns),
|
||
requires=validate_attrs(requires),
|
||
scores=list(default_score_weights.keys()),
|
||
default_score_weights=default_score_weights,
|
||
retokenizes=retokenizes,
|
||
)
|
||
cls.set_factory_meta(name, factory_meta)
|
||
# We're overwriting the class attr with a frozen dict to handle
|
||
# backwards-compat (writing to Language.factories directly). This
|
||
# wouldn't work with an instance property and just produce a
|
||
# confusing error – here we can show a custom error
|
||
cls.factories = SimpleFrozenDict(
|
||
registry.factories.get_all(), error=Errors.E957
|
||
)
|
||
return factory_func
|
||
|
||
if func is not None: # Support non-decorator use cases
|
||
return add_factory(func)
|
||
return add_factory
|
||
|
||
@classmethod
|
||
def component(
|
||
cls,
|
||
name: str,
|
||
*,
|
||
assigns: Iterable[str] = SimpleFrozenList(),
|
||
requires: Iterable[str] = SimpleFrozenList(),
|
||
retokenizes: bool = False,
|
||
func: Optional["Pipe"] = None,
|
||
) -> Callable:
|
||
"""Register a new pipeline component. Can be used for stateless function
|
||
components that don't require a separate factory. Can be used as a
|
||
decorator on a function or classmethod, or called as a function with the
|
||
factory provided as the func keyword argument. To create a component and
|
||
add it to the pipeline, you can use nlp.add_pipe(name).
|
||
|
||
name (str): The name of the component factory.
|
||
assigns (Iterable[str]): Doc/Token attributes assigned by this component,
|
||
e.g. "token.ent_id". Used for pipeline analysis.
|
||
requires (Iterable[str]): Doc/Token attributes required by this component,
|
||
e.g. "token.ent_id". Used for pipeline analysis.
|
||
retokenizes (bool): Whether the component changes the tokenization.
|
||
Used for pipeline analysis.
|
||
func (Optional[Callable]): Factory function if not used as a decorator.
|
||
|
||
DOCS: https://spacy.io/api/language#component
|
||
"""
|
||
if name is not None and not isinstance(name, str):
|
||
raise ValueError(Errors.E963.format(decorator="component"))
|
||
component_name = name if name is not None else util.get_object_name(func)
|
||
|
||
def add_component(component_func: "Pipe") -> Callable:
|
||
if isinstance(func, type): # function is a class
|
||
raise ValueError(Errors.E965.format(name=component_name))
|
||
|
||
def factory_func(nlp, name: str) -> "Pipe":
|
||
return component_func
|
||
|
||
internal_name = cls.get_factory_name(name)
|
||
if internal_name in registry.factories:
|
||
# We only check for the internal name here – it's okay if it's a
|
||
# subclass and the base class has a factory of the same name. We
|
||
# also only raise if the function is different to prevent raising
|
||
# if module is reloaded. It's hacky, but we need to check the
|
||
# existing functure for a closure and whether that's identical
|
||
# to the component function (because factory_func created above
|
||
# will always be different, even for the same function)
|
||
existing_func = registry.factories.get(internal_name)
|
||
closure = existing_func.__closure__
|
||
wrapped = [c.cell_contents for c in closure][0] if closure else None
|
||
if util.is_same_func(wrapped, component_func):
|
||
factory_func = existing_func # noqa: F811
|
||
|
||
cls.factory(
|
||
component_name,
|
||
assigns=assigns,
|
||
requires=requires,
|
||
retokenizes=retokenizes,
|
||
func=factory_func,
|
||
)
|
||
return component_func
|
||
|
||
if func is not None: # Support non-decorator use cases
|
||
return add_component(func)
|
||
return add_component
|
||
|
||
def analyze_pipes(
|
||
self,
|
||
*,
|
||
keys: List[str] = ["assigns", "requires", "scores", "retokenizes"],
|
||
pretty: bool = False,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""Analyze the current pipeline components, print a summary of what
|
||
they assign or require and check that all requirements are met.
|
||
|
||
keys (List[str]): The meta values to display in the table. Corresponds
|
||
to values in FactoryMeta, defined by @Language.factory decorator.
|
||
pretty (bool): Pretty-print the results.
|
||
RETURNS (dict): The data.
|
||
"""
|
||
analysis = analyze_pipes(self, keys=keys)
|
||
if pretty:
|
||
print_pipe_analysis(analysis, keys=keys)
|
||
return analysis
|
||
|
||
def get_pipe(self, name: str) -> "Pipe":
|
||
"""Get a pipeline component for a given component name.
|
||
|
||
name (str): Name of pipeline component to get.
|
||
RETURNS (callable): The pipeline component.
|
||
|
||
DOCS: https://spacy.io/api/language#get_pipe
|
||
"""
|
||
for pipe_name, component in self._components:
|
||
if pipe_name == name:
|
||
return component
|
||
raise KeyError(Errors.E001.format(name=name, opts=self.component_names))
|
||
|
||
def create_pipe(
|
||
self,
|
||
factory_name: str,
|
||
name: Optional[str] = None,
|
||
*,
|
||
config: Dict[str, Any] = SimpleFrozenDict(),
|
||
raw_config: Optional[Config] = None,
|
||
validate: bool = True,
|
||
) -> "Pipe":
|
||
"""Create a pipeline component. Mostly used internally. To create and
|
||
add a component to the pipeline, you can use nlp.add_pipe.
|
||
|
||
factory_name (str): Name of component factory.
|
||
name (Optional[str]): Optional name to assign to component instance.
|
||
Defaults to factory name if not set.
|
||
config (Dict[str, Any]): Config parameters to use for this component.
|
||
Will be merged with default config, if available.
|
||
raw_config (Optional[Config]): Internals: the non-interpolated config.
|
||
validate (bool): Whether to validate the component config against the
|
||
arguments and types expected by the factory.
|
||
RETURNS (Pipe): The pipeline component.
|
||
|
||
DOCS: https://spacy.io/api/language#create_pipe
|
||
"""
|
||
name = name if name is not None else factory_name
|
||
if not isinstance(config, dict):
|
||
err = Errors.E962.format(style="config", name=name, cfg_type=type(config))
|
||
raise ValueError(err)
|
||
if not srsly.is_json_serializable(config):
|
||
raise ValueError(Errors.E961.format(config=config))
|
||
if not self.has_factory(factory_name):
|
||
err = Errors.E002.format(
|
||
name=factory_name,
|
||
opts=", ".join(self.factory_names),
|
||
method="create_pipe",
|
||
lang=util.get_object_name(self),
|
||
lang_code=self.lang,
|
||
)
|
||
raise ValueError(err)
|
||
pipe_meta = self.get_factory_meta(factory_name)
|
||
# This is unideal, but the alternative would mean you always need to
|
||
# specify the full config settings, which is not really viable.
|
||
if pipe_meta.default_config:
|
||
config = Config(pipe_meta.default_config).merge(config)
|
||
internal_name = self.get_factory_name(factory_name)
|
||
# If the language-specific factory doesn't exist, try again with the
|
||
# not-specific name
|
||
if internal_name not in registry.factories:
|
||
internal_name = factory_name
|
||
# The name allows components to know their pipe name and use it in the
|
||
# losses etc. (even if multiple instances of the same factory are used)
|
||
config = {"nlp": self, "name": name, **config, "@factories": internal_name}
|
||
# We need to create a top-level key because Thinc doesn't allow resolving
|
||
# top-level references to registered functions. Also gives nicer errors.
|
||
cfg = {factory_name: config}
|
||
# We're calling the internal _fill here to avoid constructing the
|
||
# registered functions twice
|
||
resolved = registry.resolve(cfg, validate=validate)
|
||
filled = registry.fill({"cfg": cfg[factory_name]}, validate=validate)["cfg"]
|
||
filled = Config(filled)
|
||
filled["factory"] = factory_name
|
||
filled.pop("@factories", None)
|
||
# Remove the extra values we added because we don't want to keep passing
|
||
# them around, copying them etc.
|
||
filled.pop("nlp", None)
|
||
filled.pop("name", None)
|
||
# Merge the final filled config with the raw config (including non-
|
||
# interpolated variables)
|
||
if raw_config:
|
||
filled = filled.merge(raw_config)
|
||
self._pipe_configs[name] = filled
|
||
return resolved[factory_name]
|
||
|
||
def create_pipe_from_source(
|
||
self, source_name: str, source: "Language", *, name: str
|
||
) -> Tuple["Pipe", str]:
|
||
"""Create a pipeline component by copying it from an existing model.
|
||
|
||
source_name (str): Name of the component in the source pipeline.
|
||
source (Language): The source nlp object to copy from.
|
||
name (str): Optional alternative name to use in current pipeline.
|
||
RETURNS (Tuple[Callable, str]): The component and its factory name.
|
||
"""
|
||
# Check source type
|
||
if not isinstance(source, Language):
|
||
raise ValueError(Errors.E945.format(name=source_name, source=type(source)))
|
||
# Check vectors, with faster checks first
|
||
if (
|
||
self.vocab.vectors.shape != source.vocab.vectors.shape
|
||
or self.vocab.vectors.key2row != source.vocab.vectors.key2row
|
||
or self.vocab.vectors.to_bytes() != source.vocab.vectors.to_bytes()
|
||
):
|
||
warnings.warn(Warnings.W113.format(name=source_name))
|
||
if source_name not in source.component_names:
|
||
raise KeyError(
|
||
Errors.E944.format(
|
||
name=source_name,
|
||
model=f"{source.meta['lang']}_{source.meta['name']}",
|
||
opts=", ".join(source.component_names),
|
||
)
|
||
)
|
||
pipe = source.get_pipe(source_name)
|
||
# Make sure the source config is interpolated so we don't end up with
|
||
# orphaned variables in our final config
|
||
source_config = source.config.interpolate()
|
||
pipe_config = util.copy_config(source_config["components"][source_name])
|
||
self._pipe_configs[name] = pipe_config
|
||
if self.vocab.strings != source.vocab.strings:
|
||
for s in source.vocab.strings:
|
||
self.vocab.strings.add(s)
|
||
return pipe, pipe_config["factory"]
|
||
|
||
def add_pipe(
|
||
self,
|
||
factory_name: str,
|
||
name: Optional[str] = None,
|
||
*,
|
||
before: Optional[Union[str, int]] = None,
|
||
after: Optional[Union[str, int]] = None,
|
||
first: Optional[bool] = None,
|
||
last: Optional[bool] = None,
|
||
source: Optional["Language"] = None,
|
||
config: Dict[str, Any] = SimpleFrozenDict(),
|
||
raw_config: Optional[Config] = None,
|
||
validate: bool = True,
|
||
) -> "Pipe":
|
||
"""Add a component to the processing pipeline. Valid components are
|
||
callables that take a `Doc` object, modify it and return it. Only one
|
||
of before/after/first/last can be set. Default behaviour is "last".
|
||
|
||
factory_name (str): Name of the component factory.
|
||
name (str): Name of pipeline component. Overwrites existing
|
||
component.name attribute if available. If no name is set and
|
||
the component exposes no name attribute, component.__name__ is
|
||
used. An error is raised if a name already exists in the pipeline.
|
||
before (Union[str, int]): Name or index of the component to insert new
|
||
component directly before.
|
||
after (Union[str, int]): Name or index of the component to insert new
|
||
component directly after.
|
||
first (bool): If True, insert component first in the pipeline.
|
||
last (bool): If True, insert component last in the pipeline.
|
||
source (Language): Optional loaded nlp object to copy the pipeline
|
||
component from.
|
||
config (Dict[str, Any]): Config parameters to use for this component.
|
||
Will be merged with default config, if available.
|
||
raw_config (Optional[Config]): Internals: the non-interpolated config.
|
||
validate (bool): Whether to validate the component config against the
|
||
arguments and types expected by the factory.
|
||
RETURNS (Pipe): The pipeline component.
|
||
|
||
DOCS: https://spacy.io/api/language#add_pipe
|
||
"""
|
||
if not isinstance(factory_name, str):
|
||
bad_val = repr(factory_name)
|
||
err = Errors.E966.format(component=bad_val, name=name)
|
||
raise ValueError(err)
|
||
name = name if name is not None else factory_name
|
||
if name in self.component_names:
|
||
raise ValueError(Errors.E007.format(name=name, opts=self.component_names))
|
||
if source is not None:
|
||
# We're loading the component from a model. After loading the
|
||
# component, we know its real factory name
|
||
pipe_component, factory_name = self.create_pipe_from_source(
|
||
factory_name, source, name=name
|
||
)
|
||
else:
|
||
if not self.has_factory(factory_name):
|
||
err = Errors.E002.format(
|
||
name=factory_name,
|
||
opts=", ".join(self.factory_names),
|
||
method="add_pipe",
|
||
lang=util.get_object_name(self),
|
||
lang_code=self.lang,
|
||
)
|
||
pipe_component = self.create_pipe(
|
||
factory_name,
|
||
name=name,
|
||
config=config,
|
||
raw_config=raw_config,
|
||
validate=validate,
|
||
)
|
||
pipe_index = self._get_pipe_index(before, after, first, last)
|
||
self._pipe_meta[name] = self.get_factory_meta(factory_name)
|
||
self._components.insert(pipe_index, (name, pipe_component))
|
||
return pipe_component
|
||
|
||
def _get_pipe_index(
|
||
self,
|
||
before: Optional[Union[str, int]] = None,
|
||
after: Optional[Union[str, int]] = None,
|
||
first: Optional[bool] = None,
|
||
last: Optional[bool] = None,
|
||
) -> int:
|
||
"""Determine where to insert a pipeline component based on the before/
|
||
after/first/last values.
|
||
|
||
before (str): Name or index of the component to insert directly before.
|
||
after (str): Name or index of component to insert directly after.
|
||
first (bool): If True, insert component first in the pipeline.
|
||
last (bool): If True, insert component last in the pipeline.
|
||
RETURNS (int): The index of the new pipeline component.
|
||
"""
|
||
all_args = {"before": before, "after": after, "first": first, "last": last}
|
||
if sum(arg is not None for arg in [before, after, first, last]) >= 2:
|
||
raise ValueError(
|
||
Errors.E006.format(args=all_args, opts=self.component_names)
|
||
)
|
||
if last or not any(value is not None for value in [first, before, after]):
|
||
return len(self._components)
|
||
elif first:
|
||
return 0
|
||
elif isinstance(before, str):
|
||
if before not in self.component_names:
|
||
raise ValueError(
|
||
Errors.E001.format(name=before, opts=self.component_names)
|
||
)
|
||
return self.component_names.index(before)
|
||
elif isinstance(after, str):
|
||
if after not in self.component_names:
|
||
raise ValueError(
|
||
Errors.E001.format(name=after, opts=self.component_names)
|
||
)
|
||
return self.component_names.index(after) + 1
|
||
# We're only accepting indices referring to components that exist
|
||
# (can't just do isinstance here because bools are instance of int, too)
|
||
elif type(before) == int:
|
||
if before >= len(self._components) or before < 0:
|
||
err = Errors.E959.format(
|
||
dir="before", idx=before, opts=self.component_names
|
||
)
|
||
raise ValueError(err)
|
||
return before
|
||
elif type(after) == int:
|
||
if after >= len(self._components) or after < 0:
|
||
err = Errors.E959.format(
|
||
dir="after", idx=after, opts=self.component_names
|
||
)
|
||
raise ValueError(err)
|
||
return after + 1
|
||
raise ValueError(Errors.E006.format(args=all_args, opts=self.component_names))
|
||
|
||
def has_pipe(self, name: str) -> bool:
|
||
"""Check if a component name is present in the pipeline. Equivalent to
|
||
`name in nlp.pipe_names`.
|
||
|
||
name (str): Name of the component.
|
||
RETURNS (bool): Whether a component of the name exists in the pipeline.
|
||
|
||
DOCS: https://spacy.io/api/language#has_pipe
|
||
"""
|
||
return name in self.pipe_names
|
||
|
||
def replace_pipe(
|
||
self,
|
||
name: str,
|
||
factory_name: str,
|
||
*,
|
||
config: Dict[str, Any] = SimpleFrozenDict(),
|
||
validate: bool = True,
|
||
) -> "Pipe":
|
||
"""Replace a component in the pipeline.
|
||
|
||
name (str): Name of the component to replace.
|
||
factory_name (str): Factory name of replacement component.
|
||
config (Optional[Dict[str, Any]]): Config parameters to use for this
|
||
component. Will be merged with default config, if available.
|
||
validate (bool): Whether to validate the component config against the
|
||
arguments and types expected by the factory.
|
||
RETURNS (Pipe): The new pipeline component.
|
||
|
||
DOCS: https://spacy.io/api/language#replace_pipe
|
||
"""
|
||
if name not in self.component_names:
|
||
raise ValueError(Errors.E001.format(name=name, opts=self.pipe_names))
|
||
if hasattr(factory_name, "__call__"):
|
||
err = Errors.E968.format(component=repr(factory_name), name=name)
|
||
raise ValueError(err)
|
||
# We need to delegate to Language.add_pipe here instead of just writing
|
||
# to Language.pipeline to make sure the configs are handled correctly
|
||
pipe_index = self.component_names.index(name)
|
||
self.remove_pipe(name)
|
||
if not len(self._components) or pipe_index == len(self._components):
|
||
# we have no components to insert before/after, or we're replacing the last component
|
||
return self.add_pipe(
|
||
factory_name, name=name, config=config, validate=validate
|
||
)
|
||
else:
|
||
return self.add_pipe(
|
||
factory_name,
|
||
name=name,
|
||
before=pipe_index,
|
||
config=config,
|
||
validate=validate,
|
||
)
|
||
|
||
def rename_pipe(self, old_name: str, new_name: str) -> None:
|
||
"""Rename a pipeline component.
|
||
|
||
old_name (str): Name of the component to rename.
|
||
new_name (str): New name of the component.
|
||
|
||
DOCS: https://spacy.io/api/language#rename_pipe
|
||
"""
|
||
if old_name not in self.component_names:
|
||
raise ValueError(
|
||
Errors.E001.format(name=old_name, opts=self.component_names)
|
||
)
|
||
if new_name in self.component_names:
|
||
raise ValueError(
|
||
Errors.E007.format(name=new_name, opts=self.component_names)
|
||
)
|
||
i = self.component_names.index(old_name)
|
||
self._components[i] = (new_name, self._components[i][1])
|
||
self._pipe_meta[new_name] = self._pipe_meta.pop(old_name)
|
||
self._pipe_configs[new_name] = self._pipe_configs.pop(old_name)
|
||
# Make sure [initialize] config is adjusted
|
||
if old_name in self._config["initialize"]["components"]:
|
||
init_cfg = self._config["initialize"]["components"].pop(old_name)
|
||
self._config["initialize"]["components"][new_name] = init_cfg
|
||
|
||
def remove_pipe(self, name: str) -> Tuple[str, "Pipe"]:
|
||
"""Remove a component from the pipeline.
|
||
|
||
name (str): Name of the component to remove.
|
||
RETURNS (tuple): A `(name, component)` tuple of the removed component.
|
||
|
||
DOCS: https://spacy.io/api/language#remove_pipe
|
||
"""
|
||
if name not in self.component_names:
|
||
raise ValueError(Errors.E001.format(name=name, opts=self.component_names))
|
||
removed = self._components.pop(self.component_names.index(name))
|
||
# We're only removing the component itself from the metas/configs here
|
||
# because factory may be used for something else
|
||
self._pipe_meta.pop(name)
|
||
self._pipe_configs.pop(name)
|
||
self.meta.get("_sourced_vectors_hashes", {}).pop(name, None)
|
||
# Make sure name is removed from the [initialize] config
|
||
if name in self._config["initialize"]["components"]:
|
||
self._config["initialize"]["components"].pop(name)
|
||
# Make sure the name is also removed from the set of disabled components
|
||
if name in self.disabled:
|
||
self._disabled.remove(name)
|
||
return removed
|
||
|
||
def disable_pipe(self, name: str) -> None:
|
||
"""Disable a pipeline component. The component will still exist on
|
||
the nlp object, but it won't be run as part of the pipeline. Does
|
||
nothing if the component is already disabled.
|
||
|
||
name (str): The name of the component to disable.
|
||
"""
|
||
if name not in self.component_names:
|
||
raise ValueError(Errors.E001.format(name=name, opts=self.component_names))
|
||
self._disabled.add(name)
|
||
|
||
def enable_pipe(self, name: str) -> None:
|
||
"""Enable a previously disabled pipeline component so it's run as part
|
||
of the pipeline. Does nothing if the component is already enabled.
|
||
|
||
name (str): The name of the component to enable.
|
||
"""
|
||
if name not in self.component_names:
|
||
raise ValueError(Errors.E001.format(name=name, opts=self.component_names))
|
||
if name in self.disabled:
|
||
self._disabled.remove(name)
|
||
|
||
def __call__(
|
||
self,
|
||
text: Union[str, Doc],
|
||
*,
|
||
disable: Iterable[str] = SimpleFrozenList(),
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
) -> Doc:
|
||
"""Apply the pipeline to some text. The text can span multiple sentences,
|
||
and can contain arbitrary whitespace. Alignment into the original string
|
||
is preserved.
|
||
|
||
text (Union[str, Doc]): If `str`, the text to be processed. If `Doc`,
|
||
the doc will be passed directly to the pipeline, skipping
|
||
`Language.make_doc`.
|
||
disable (List[str]): Names of the pipeline components to disable.
|
||
component_cfg (Dict[str, dict]): An optional dictionary with extra
|
||
keyword arguments for specific components.
|
||
RETURNS (Doc): A container for accessing the annotations.
|
||
|
||
DOCS: https://spacy.io/api/language#call
|
||
"""
|
||
doc = self._ensure_doc(text)
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
for name, proc in self.pipeline:
|
||
if name in disable:
|
||
continue
|
||
if not hasattr(proc, "__call__"):
|
||
raise ValueError(Errors.E003.format(component=type(proc), name=name))
|
||
error_handler = self.default_error_handler
|
||
if hasattr(proc, "get_error_handler"):
|
||
error_handler = proc.get_error_handler()
|
||
try:
|
||
doc = proc(doc, **component_cfg.get(name, {})) # type: ignore[call-arg]
|
||
except KeyError as e:
|
||
# This typically happens if a component is not initialized
|
||
raise ValueError(Errors.E109.format(name=name)) from e
|
||
except Exception as e:
|
||
error_handler(name, proc, [doc], e)
|
||
if doc is None:
|
||
raise ValueError(Errors.E005.format(name=name))
|
||
return doc
|
||
|
||
def disable_pipes(self, *names) -> "DisabledPipes":
|
||
"""Disable one or more pipeline components. If used as a context
|
||
manager, the pipeline will be restored to the initial state at the end
|
||
of the block. Otherwise, a DisabledPipes object is returned, that has
|
||
a `.restore()` method you can use to undo your changes.
|
||
|
||
This method has been deprecated since 3.0
|
||
"""
|
||
warnings.warn(Warnings.W096, DeprecationWarning)
|
||
if len(names) == 1 and isinstance(names[0], (list, tuple)):
|
||
names = names[0] # type: ignore[assignment] # support list of names instead of spread
|
||
return self.select_pipes(disable=names)
|
||
|
||
def select_pipes(
|
||
self,
|
||
*,
|
||
disable: Optional[Union[str, Iterable[str]]] = None,
|
||
enable: Optional[Union[str, Iterable[str]]] = None,
|
||
) -> "DisabledPipes":
|
||
"""Disable one or more pipeline components. If used as a context
|
||
manager, the pipeline will be restored to the initial state at the end
|
||
of the block. Otherwise, a DisabledPipes object is returned, that has
|
||
a `.restore()` method you can use to undo your changes.
|
||
|
||
disable (str or iterable): The name(s) of the pipes to disable
|
||
enable (str or iterable): The name(s) of the pipes to enable - all others will be disabled
|
||
|
||
DOCS: https://spacy.io/api/language#select_pipes
|
||
"""
|
||
if enable is None and disable is None:
|
||
raise ValueError(Errors.E991)
|
||
if disable is not None and isinstance(disable, str):
|
||
disable = [disable]
|
||
if enable is not None:
|
||
if isinstance(enable, str):
|
||
enable = [enable]
|
||
to_disable = [pipe for pipe in self.pipe_names if pipe not in enable]
|
||
# raise an error if the enable and disable keywords are not consistent
|
||
if disable is not None and disable != to_disable:
|
||
raise ValueError(
|
||
Errors.E992.format(
|
||
enable=enable, disable=disable, names=self.pipe_names
|
||
)
|
||
)
|
||
disable = to_disable
|
||
assert disable is not None
|
||
# DisabledPipes will restore the pipes in 'disable' when it's done, so we need to exclude
|
||
# those pipes that were already disabled.
|
||
disable = [d for d in disable if d not in self._disabled]
|
||
return DisabledPipes(self, disable)
|
||
|
||
def make_doc(self, text: str) -> Doc:
|
||
"""Turn a text into a Doc object.
|
||
|
||
text (str): The text to process.
|
||
RETURNS (Doc): The processed doc.
|
||
"""
|
||
if len(text) > self.max_length:
|
||
raise ValueError(
|
||
Errors.E088.format(length=len(text), max_length=self.max_length)
|
||
)
|
||
return self.tokenizer(text)
|
||
|
||
def _ensure_doc(self, doc_like: Union[str, Doc]) -> Doc:
|
||
"""Create a Doc if need be, or raise an error if the input is not a Doc or a string."""
|
||
if isinstance(doc_like, Doc):
|
||
return doc_like
|
||
if isinstance(doc_like, str):
|
||
return self.make_doc(doc_like)
|
||
raise ValueError(Errors.E866.format(type=type(doc_like)))
|
||
|
||
def update(
|
||
self,
|
||
examples: Iterable[Example],
|
||
_: Optional[Any] = None,
|
||
*,
|
||
drop: float = 0.0,
|
||
sgd: Optional[Optimizer] = None,
|
||
losses: Optional[Dict[str, float]] = None,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
exclude: Iterable[str] = SimpleFrozenList(),
|
||
annotates: Iterable[str] = SimpleFrozenList(),
|
||
):
|
||
"""Update the models in the pipeline.
|
||
|
||
examples (Iterable[Example]): A batch of examples
|
||
_: Should not be set - serves to catch backwards-incompatible scripts.
|
||
drop (float): The dropout rate.
|
||
sgd (Optimizer): An optimizer.
|
||
losses (Dict[str, float]): Dictionary to update with the loss, keyed by
|
||
component.
|
||
component_cfg (Dict[str, Dict]): Config parameters for specific pipeline
|
||
components, keyed by component name.
|
||
exclude (Iterable[str]): Names of components that shouldn't be updated.
|
||
annotates (Iterable[str]): Names of components that should set
|
||
annotations on the predicted examples after updating.
|
||
RETURNS (Dict[str, float]): The updated losses dictionary
|
||
|
||
DOCS: https://spacy.io/api/language#update
|
||
"""
|
||
if _ is not None:
|
||
raise ValueError(Errors.E989)
|
||
if losses is None:
|
||
losses = {}
|
||
if isinstance(examples, list) and len(examples) == 0:
|
||
return losses
|
||
validate_examples(examples, "Language.update")
|
||
examples = _copy_examples(examples)
|
||
if sgd is None:
|
||
if self._optimizer is None:
|
||
self._optimizer = self.create_optimizer()
|
||
sgd = self._optimizer
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
pipe_kwargs = {}
|
||
for i, (name, proc) in enumerate(self.pipeline):
|
||
component_cfg.setdefault(name, {})
|
||
pipe_kwargs[name] = deepcopy(component_cfg[name])
|
||
component_cfg[name].setdefault("drop", drop)
|
||
pipe_kwargs[name].setdefault("batch_size", self.batch_size)
|
||
for name, proc in self.pipeline:
|
||
# ignore statements are used here because mypy ignores hasattr
|
||
if name not in exclude and hasattr(proc, "update"):
|
||
proc.update(examples, sgd=None, losses=losses, **component_cfg[name]) # type: ignore
|
||
if sgd not in (None, False):
|
||
if (
|
||
name not in exclude
|
||
and isinstance(proc, ty.TrainableComponent)
|
||
and proc.is_trainable
|
||
and proc.model not in (True, False, None)
|
||
):
|
||
proc.finish_update(sgd)
|
||
if name in annotates:
|
||
for doc, eg in zip(
|
||
_pipe(
|
||
(eg.predicted for eg in examples),
|
||
proc=proc,
|
||
name=name,
|
||
default_error_handler=self.default_error_handler,
|
||
kwargs=pipe_kwargs[name],
|
||
),
|
||
examples,
|
||
):
|
||
eg.predicted = doc
|
||
return losses
|
||
|
||
def rehearse(
|
||
self,
|
||
examples: Iterable[Example],
|
||
*,
|
||
sgd: Optional[Optimizer] = None,
|
||
losses: Optional[Dict[str, float]] = None,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
exclude: Iterable[str] = SimpleFrozenList(),
|
||
) -> Dict[str, float]:
|
||
"""Make a "rehearsal" update to the models in the pipeline, to prevent
|
||
forgetting. Rehearsal updates run an initial copy of the model over some
|
||
data, and update the model so its current predictions are more like the
|
||
initial ones. This is useful for keeping a pretrained model on-track,
|
||
even if you're updating it with a smaller set of examples.
|
||
|
||
examples (Iterable[Example]): A batch of `Example` objects.
|
||
sgd (Optional[Optimizer]): An optimizer.
|
||
component_cfg (Dict[str, Dict]): Config parameters for specific pipeline
|
||
components, keyed by component name.
|
||
exclude (Iterable[str]): Names of components that shouldn't be updated.
|
||
RETURNS (dict): Results from the update.
|
||
|
||
EXAMPLE:
|
||
>>> raw_text_batches = minibatch(raw_texts)
|
||
>>> for labelled_batch in minibatch(examples):
|
||
>>> nlp.update(labelled_batch)
|
||
>>> raw_batch = [Example.from_dict(nlp.make_doc(text), {}) for text in next(raw_text_batches)]
|
||
>>> nlp.rehearse(raw_batch)
|
||
|
||
DOCS: https://spacy.io/api/language#rehearse
|
||
"""
|
||
if losses is None:
|
||
losses = {}
|
||
if isinstance(examples, list) and len(examples) == 0:
|
||
return losses
|
||
validate_examples(examples, "Language.rehearse")
|
||
if sgd is None:
|
||
if self._optimizer is None:
|
||
self._optimizer = self.create_optimizer()
|
||
sgd = self._optimizer
|
||
pipes = list(self.pipeline)
|
||
random.shuffle(pipes)
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
grads = {}
|
||
|
||
def get_grads(W, dW, key=None):
|
||
grads[key] = (W, dW)
|
||
|
||
get_grads.learn_rate = sgd.learn_rate # type: ignore[attr-defined, union-attr]
|
||
get_grads.b1 = sgd.b1 # type: ignore[attr-defined, union-attr]
|
||
get_grads.b2 = sgd.b2 # type: ignore[attr-defined, union-attr]
|
||
for name, proc in pipes:
|
||
if name in exclude or not hasattr(proc, "rehearse"):
|
||
continue
|
||
grads = {}
|
||
proc.rehearse( # type: ignore[attr-defined]
|
||
examples, sgd=get_grads, losses=losses, **component_cfg.get(name, {})
|
||
)
|
||
for key, (W, dW) in grads.items():
|
||
sgd(W, dW, key=key) # type: ignore[call-arg, misc]
|
||
return losses
|
||
|
||
def begin_training(
|
||
self,
|
||
get_examples: Optional[Callable[[], Iterable[Example]]] = None,
|
||
*,
|
||
sgd: Optional[Optimizer] = None,
|
||
) -> Optimizer:
|
||
warnings.warn(Warnings.W089, DeprecationWarning)
|
||
return self.initialize(get_examples, sgd=sgd)
|
||
|
||
def initialize(
|
||
self,
|
||
get_examples: Optional[Callable[[], Iterable[Example]]] = None,
|
||
*,
|
||
sgd: Optional[Optimizer] = None,
|
||
) -> Optimizer:
|
||
"""Initialize the pipe for training, using data examples if available.
|
||
|
||
get_examples (Callable[[], Iterable[Example]]): Optional function that
|
||
returns gold-standard Example objects.
|
||
sgd (Optional[Optimizer]): An optimizer to use for updates. If not
|
||
provided, will be created using the .create_optimizer() method.
|
||
RETURNS (thinc.api.Optimizer): The optimizer.
|
||
|
||
DOCS: https://spacy.io/api/language#initialize
|
||
"""
|
||
if get_examples is None:
|
||
util.logger.debug(
|
||
"No 'get_examples' callback provided to 'Language.initialize', creating dummy examples"
|
||
)
|
||
doc = Doc(self.vocab, words=["x", "y", "z"])
|
||
get_examples = lambda: [Example.from_dict(doc, {})]
|
||
if not hasattr(get_examples, "__call__"):
|
||
err = Errors.E930.format(
|
||
method="Language.initialize", obj=type(get_examples)
|
||
)
|
||
raise TypeError(err)
|
||
# Make sure the config is interpolated so we can resolve subsections
|
||
config = self.config.interpolate()
|
||
# These are the settings provided in the [initialize] block in the config
|
||
I = registry.resolve(config["initialize"], schema=ConfigSchemaInit)
|
||
before_init = I["before_init"]
|
||
if before_init is not None:
|
||
before_init(self)
|
||
try:
|
||
init_vocab(
|
||
self, data=I["vocab_data"], lookups=I["lookups"], vectors=I["vectors"]
|
||
)
|
||
except IOError:
|
||
raise IOError(Errors.E884.format(vectors=I["vectors"]))
|
||
if self.vocab.vectors.data.shape[1] >= 1:
|
||
ops = get_current_ops()
|
||
self.vocab.vectors.data = ops.asarray(self.vocab.vectors.data)
|
||
if hasattr(self.tokenizer, "initialize"):
|
||
tok_settings = validate_init_settings(
|
||
self.tokenizer.initialize, # type: ignore[union-attr]
|
||
I["tokenizer"],
|
||
section="tokenizer",
|
||
name="tokenizer",
|
||
)
|
||
self.tokenizer.initialize(get_examples, nlp=self, **tok_settings) # type: ignore[union-attr]
|
||
for name, proc in self.pipeline:
|
||
if isinstance(proc, ty.InitializableComponent):
|
||
p_settings = I["components"].get(name, {})
|
||
p_settings = validate_init_settings(
|
||
proc.initialize, p_settings, section="components", name=name
|
||
)
|
||
proc.initialize(get_examples, nlp=self, **p_settings)
|
||
pretrain_cfg = config.get("pretraining")
|
||
if pretrain_cfg:
|
||
P = registry.resolve(pretrain_cfg, schema=ConfigSchemaPretrain)
|
||
init_tok2vec(self, P, I)
|
||
self._link_components()
|
||
self._optimizer = sgd
|
||
if sgd is not None:
|
||
self._optimizer = sgd
|
||
elif self._optimizer is None:
|
||
self._optimizer = self.create_optimizer()
|
||
after_init = I["after_init"]
|
||
if after_init is not None:
|
||
after_init(self)
|
||
return self._optimizer
|
||
|
||
def resume_training(self, *, sgd: Optional[Optimizer] = None) -> Optimizer:
|
||
"""Continue training a pretrained model.
|
||
|
||
Create and return an optimizer, and initialize "rehearsal" for any pipeline
|
||
component that has a .rehearse() method. Rehearsal is used to prevent
|
||
models from "forgetting" their initialized "knowledge". To perform
|
||
rehearsal, collect samples of text you want the models to retain performance
|
||
on, and call nlp.rehearse() with a batch of Example objects.
|
||
|
||
RETURNS (Optimizer): The optimizer.
|
||
|
||
DOCS: https://spacy.io/api/language#resume_training
|
||
"""
|
||
ops = get_current_ops()
|
||
if self.vocab.vectors.data.shape[1] >= 1:
|
||
self.vocab.vectors.data = ops.asarray(self.vocab.vectors.data)
|
||
for name, proc in self.pipeline:
|
||
if hasattr(proc, "_rehearsal_model"):
|
||
proc._rehearsal_model = deepcopy(proc.model) # type: ignore[attr-defined]
|
||
if sgd is not None:
|
||
self._optimizer = sgd
|
||
elif self._optimizer is None:
|
||
self._optimizer = self.create_optimizer()
|
||
return self._optimizer
|
||
|
||
def set_error_handler(
|
||
self,
|
||
error_handler: Callable[[str, "Pipe", List[Doc], Exception], NoReturn],
|
||
):
|
||
"""Set an error handler object for all the components in the pipeline that implement
|
||
a set_error_handler function.
|
||
|
||
error_handler (Callable[[str, Pipe, List[Doc], Exception], NoReturn]):
|
||
Function that deals with a failing batch of documents. This callable function should take in
|
||
the component's name, the component itself, the offending batch of documents, and the exception
|
||
that was thrown.
|
||
DOCS: https://spacy.io/api/language#set_error_handler
|
||
"""
|
||
self.default_error_handler = error_handler
|
||
for name, pipe in self.pipeline:
|
||
if hasattr(pipe, "set_error_handler"):
|
||
pipe.set_error_handler(error_handler)
|
||
|
||
def evaluate(
|
||
self,
|
||
examples: Iterable[Example],
|
||
*,
|
||
batch_size: Optional[int] = None,
|
||
scorer: Optional[Scorer] = None,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
scorer_cfg: Optional[Dict[str, Any]] = None,
|
||
) -> Dict[str, Any]:
|
||
"""Evaluate a model's pipeline components.
|
||
|
||
examples (Iterable[Example]): `Example` objects.
|
||
batch_size (Optional[int]): Batch size to use.
|
||
scorer (Optional[Scorer]): Scorer to use. If not passed in, a new one
|
||
will be created.
|
||
component_cfg (dict): An optional dictionary with extra keyword
|
||
arguments for specific components.
|
||
scorer_cfg (dict): An optional dictionary with extra keyword arguments
|
||
for the scorer.
|
||
|
||
RETURNS (Scorer): The scorer containing the evaluation results.
|
||
|
||
DOCS: https://spacy.io/api/language#evaluate
|
||
"""
|
||
examples = list(examples)
|
||
validate_examples(examples, "Language.evaluate")
|
||
examples = _copy_examples(examples)
|
||
if batch_size is None:
|
||
batch_size = self.batch_size
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
if scorer_cfg is None:
|
||
scorer_cfg = {}
|
||
if scorer is None:
|
||
kwargs = dict(scorer_cfg)
|
||
kwargs.setdefault("nlp", self)
|
||
scorer = Scorer(**kwargs)
|
||
# reset annotation in predicted docs and time tokenization
|
||
start_time = timer()
|
||
# this is purely for timing
|
||
for eg in examples:
|
||
self.make_doc(eg.reference.text)
|
||
# apply all pipeline components
|
||
for name, pipe in self.pipeline:
|
||
kwargs = component_cfg.get(name, {})
|
||
kwargs.setdefault("batch_size", batch_size)
|
||
for doc, eg in zip(
|
||
_pipe(
|
||
(eg.predicted for eg in examples),
|
||
proc=pipe,
|
||
name=name,
|
||
default_error_handler=self.default_error_handler,
|
||
kwargs=kwargs,
|
||
),
|
||
examples,
|
||
):
|
||
eg.predicted = doc
|
||
end_time = timer()
|
||
results = scorer.score(examples)
|
||
n_words = sum(len(eg.predicted) for eg in examples)
|
||
results["speed"] = n_words / (end_time - start_time)
|
||
return results
|
||
|
||
def create_optimizer(self):
|
||
"""Create an optimizer, usually using the [training.optimizer] config."""
|
||
subconfig = {"optimizer": self.config["training"]["optimizer"]}
|
||
return registry.resolve(subconfig)["optimizer"]
|
||
|
||
@contextmanager
|
||
def use_params(self, params: Optional[dict]):
|
||
"""Replace weights of models in the pipeline with those provided in the
|
||
params dictionary. Can be used as a contextmanager, in which case,
|
||
models go back to their original weights after the block.
|
||
|
||
params (dict): A dictionary of parameters keyed by model ID.
|
||
|
||
EXAMPLE:
|
||
>>> with nlp.use_params(optimizer.averages):
|
||
>>> nlp.to_disk("/tmp/checkpoint")
|
||
|
||
DOCS: https://spacy.io/api/language#use_params
|
||
"""
|
||
if not params:
|
||
yield
|
||
else:
|
||
contexts = [
|
||
pipe.use_params(params) # type: ignore[attr-defined]
|
||
for name, pipe in self.pipeline
|
||
if hasattr(pipe, "use_params") and hasattr(pipe, "model")
|
||
]
|
||
# TODO: Having trouble with contextlib
|
||
# Workaround: these aren't actually context managers atm.
|
||
for context in contexts:
|
||
try:
|
||
next(context)
|
||
except StopIteration:
|
||
pass
|
||
yield
|
||
for context in contexts:
|
||
try:
|
||
next(context)
|
||
except StopIteration:
|
||
pass
|
||
|
||
@overload
|
||
def pipe(
|
||
self,
|
||
texts: Iterable[Union[str, Doc]],
|
||
*,
|
||
as_tuples: Literal[False] = ...,
|
||
batch_size: Optional[int] = ...,
|
||
disable: Iterable[str] = ...,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = ...,
|
||
n_process: int = ...,
|
||
) -> Iterator[Doc]:
|
||
...
|
||
|
||
@overload
|
||
def pipe( # noqa: F811
|
||
self,
|
||
texts: Iterable[Tuple[str, _AnyContext]],
|
||
*,
|
||
as_tuples: Literal[True] = ...,
|
||
batch_size: Optional[int] = ...,
|
||
disable: Iterable[str] = ...,
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = ...,
|
||
n_process: int = ...,
|
||
) -> Iterator[Tuple[Doc, _AnyContext]]:
|
||
...
|
||
|
||
def pipe( # noqa: F811
|
||
self,
|
||
texts: Union[Iterable[Union[str, Doc]], Iterable[Tuple[str, _AnyContext]]],
|
||
*,
|
||
as_tuples: bool = False,
|
||
batch_size: Optional[int] = None,
|
||
disable: Iterable[str] = SimpleFrozenList(),
|
||
component_cfg: Optional[Dict[str, Dict[str, Any]]] = None,
|
||
n_process: int = 1,
|
||
) -> Union[Iterator[Doc], Iterator[Tuple[Doc, _AnyContext]]]:
|
||
"""Process texts as a stream, and yield `Doc` objects in order.
|
||
|
||
texts (Iterable[Union[str, Doc]]): A sequence of texts or docs to
|
||
process.
|
||
as_tuples (bool): If set to True, inputs should be a sequence of
|
||
(text, context) tuples. Output will then be a sequence of
|
||
(doc, context) tuples. Defaults to False.
|
||
batch_size (Optional[int]): The number of texts to buffer.
|
||
disable (List[str]): Names of the pipeline components to disable.
|
||
component_cfg (Dict[str, Dict]): An optional dictionary with extra keyword
|
||
arguments for specific components.
|
||
n_process (int): Number of processors to process texts. If -1, set `multiprocessing.cpu_count()`.
|
||
YIELDS (Doc): Documents in the order of the original text.
|
||
|
||
DOCS: https://spacy.io/api/language#pipe
|
||
"""
|
||
# Handle texts with context as tuples
|
||
if as_tuples:
|
||
texts = cast(Iterable[Tuple[str, _AnyContext]], texts)
|
||
text_context1, text_context2 = itertools.tee(texts)
|
||
texts = (tc[0] for tc in text_context1)
|
||
contexts = (tc[1] for tc in text_context2)
|
||
docs = self.pipe(
|
||
texts,
|
||
batch_size=batch_size,
|
||
disable=disable,
|
||
n_process=n_process,
|
||
component_cfg=component_cfg,
|
||
)
|
||
for doc, context in zip(docs, contexts):
|
||
yield (doc, context)
|
||
return
|
||
|
||
# At this point, we know that we're dealing with an iterable of plain texts
|
||
texts = cast(Iterable[str], texts)
|
||
|
||
# Set argument defaults
|
||
if n_process == -1:
|
||
n_process = mp.cpu_count()
|
||
if component_cfg is None:
|
||
component_cfg = {}
|
||
if batch_size is None:
|
||
batch_size = self.batch_size
|
||
|
||
pipes = (
|
||
[]
|
||
) # contains functools.partial objects to easily create multiprocess worker.
|
||
for name, proc in self.pipeline:
|
||
if name in disable:
|
||
continue
|
||
kwargs = component_cfg.get(name, {})
|
||
# Allow component_cfg to overwrite the top-level kwargs.
|
||
kwargs.setdefault("batch_size", batch_size)
|
||
f = functools.partial(
|
||
_pipe,
|
||
proc=proc,
|
||
name=name,
|
||
kwargs=kwargs,
|
||
default_error_handler=self.default_error_handler,
|
||
)
|
||
pipes.append(f)
|
||
|
||
if n_process != 1:
|
||
if self._has_gpu_model(disable):
|
||
warnings.warn(Warnings.W114)
|
||
|
||
docs = self._multiprocessing_pipe(texts, pipes, n_process, batch_size)
|
||
else:
|
||
# if n_process == 1, no processes are forked.
|
||
docs = (self._ensure_doc(text) for text in texts)
|
||
for pipe in pipes:
|
||
docs = pipe(docs)
|
||
for doc in docs:
|
||
yield doc
|
||
|
||
def _has_gpu_model(self, disable: Iterable[str]):
|
||
for name, proc in self.pipeline:
|
||
is_trainable = hasattr(proc, "is_trainable") and proc.is_trainable # type: ignore
|
||
if name in disable or not is_trainable:
|
||
continue
|
||
|
||
if hasattr(proc, "model") and hasattr(proc.model, "ops") and isinstance(proc.model.ops, CupyOps): # type: ignore
|
||
return True
|
||
|
||
return False
|
||
|
||
def _multiprocessing_pipe(
|
||
self,
|
||
texts: Iterable[str],
|
||
pipes: Iterable[Callable[..., Iterator[Doc]]],
|
||
n_process: int,
|
||
batch_size: int,
|
||
) -> Iterator[Doc]:
|
||
# raw_texts is used later to stop iteration.
|
||
texts, raw_texts = itertools.tee(texts)
|
||
# for sending texts to worker
|
||
texts_q: List[mp.Queue] = [mp.Queue() for _ in range(n_process)]
|
||
# for receiving byte-encoded docs from worker
|
||
bytedocs_recv_ch, bytedocs_send_ch = zip(
|
||
*[mp.Pipe(False) for _ in range(n_process)]
|
||
)
|
||
|
||
batch_texts = util.minibatch(texts, batch_size)
|
||
# Sender sends texts to the workers.
|
||
# This is necessary to properly handle infinite length of texts.
|
||
# (In this case, all data cannot be sent to the workers at once)
|
||
sender = _Sender(batch_texts, texts_q, chunk_size=n_process)
|
||
# send twice to make process busy
|
||
sender.send()
|
||
sender.send()
|
||
|
||
procs = [
|
||
mp.Process(
|
||
target=_apply_pipes,
|
||
args=(self._ensure_doc, pipes, rch, sch, Underscore.get_state()),
|
||
)
|
||
for rch, sch in zip(texts_q, bytedocs_send_ch)
|
||
]
|
||
for proc in procs:
|
||
proc.start()
|
||
|
||
# Cycle channels not to break the order of docs.
|
||
# The received object is a batch of byte-encoded docs, so flatten them with chain.from_iterable.
|
||
byte_tuples = chain.from_iterable(
|
||
recv.recv() for recv in cycle(bytedocs_recv_ch)
|
||
)
|
||
try:
|
||
for i, (_, (byte_doc, byte_error)) in enumerate(
|
||
zip(raw_texts, byte_tuples), 1
|
||
):
|
||
if byte_doc is not None:
|
||
doc = Doc(self.vocab).from_bytes(byte_doc)
|
||
yield doc
|
||
elif byte_error is not None:
|
||
error = srsly.msgpack_loads(byte_error)
|
||
self.default_error_handler(
|
||
None, None, None, ValueError(Errors.E871.format(error=error))
|
||
)
|
||
if i % batch_size == 0:
|
||
# tell `sender` that one batch was consumed.
|
||
sender.step()
|
||
finally:
|
||
for proc in procs:
|
||
proc.terminate()
|
||
|
||
def _link_components(self) -> None:
|
||
"""Register 'listeners' within pipeline components, to allow them to
|
||
effectively share weights.
|
||
"""
|
||
# I had thought, "Why do we do this inside the Language object? Shouldn't
|
||
# it be the tok2vec/transformer/etc's job?
|
||
# The problem is we need to do it during deserialization...And the
|
||
# components don't receive the pipeline then. So this does have to be
|
||
# here :(
|
||
for i, (name1, proc1) in enumerate(self.pipeline):
|
||
if isinstance(proc1, ty.ListenedToComponent):
|
||
for name2, proc2 in self.pipeline[i + 1 :]:
|
||
proc1.find_listeners(proc2)
|
||
|
||
@classmethod
|
||
def from_config(
|
||
cls,
|
||
config: Union[Dict[str, Any], Config] = {},
|
||
*,
|
||
vocab: Union[Vocab, bool] = True,
|
||
disable: Iterable[str] = SimpleFrozenList(),
|
||
exclude: Iterable[str] = SimpleFrozenList(),
|
||
meta: Dict[str, Any] = SimpleFrozenDict(),
|
||
auto_fill: bool = True,
|
||
validate: bool = True,
|
||
) -> "Language":
|
||
"""Create the nlp object from a loaded config. Will set up the tokenizer
|
||
and language data, add pipeline components etc. If no config is provided,
|
||
the default config of the given language is used.
|
||
|
||
config (Dict[str, Any] / Config): The loaded config.
|
||
vocab (Vocab): A Vocab object. If True, a vocab is created.
|
||
disable (Iterable[str]): Names of pipeline components to disable.
|
||
Disabled pipes will be loaded but they won't be run unless you
|
||
explicitly enable them by calling nlp.enable_pipe.
|
||
exclude (Iterable[str]): Names of pipeline components to exclude.
|
||
Excluded components won't be loaded.
|
||
meta (Dict[str, Any]): Meta overrides for nlp.meta.
|
||
auto_fill (bool): Automatically fill in missing values in config based
|
||
on defaults and function argument annotations.
|
||
validate (bool): Validate the component config and arguments against
|
||
the types expected by the factory.
|
||
RETURNS (Language): The initialized Language class.
|
||
|
||
DOCS: https://spacy.io/api/language#from_config
|
||
"""
|
||
if auto_fill:
|
||
config = Config(
|
||
cls.default_config, section_order=CONFIG_SECTION_ORDER
|
||
).merge(config)
|
||
if "nlp" not in config:
|
||
raise ValueError(Errors.E985.format(config=config))
|
||
config_lang = config["nlp"].get("lang")
|
||
if config_lang is not None and config_lang != cls.lang:
|
||
raise ValueError(
|
||
Errors.E958.format(
|
||
bad_lang_code=config["nlp"]["lang"],
|
||
lang_code=cls.lang,
|
||
lang=util.get_object_name(cls),
|
||
)
|
||
)
|
||
config["nlp"]["lang"] = cls.lang
|
||
# This isn't very elegant, but we remove the [components] block here to prevent
|
||
# it from getting resolved (causes problems because we expect to pass in
|
||
# the nlp and name args for each component). If we're auto-filling, we're
|
||
# using the nlp.config with all defaults.
|
||
config = util.copy_config(config)
|
||
orig_pipeline = config.pop("components", {})
|
||
orig_pretraining = config.pop("pretraining", None)
|
||
config["components"] = {}
|
||
if auto_fill:
|
||
filled = registry.fill(config, validate=validate, schema=ConfigSchema)
|
||
else:
|
||
filled = config
|
||
filled["components"] = orig_pipeline
|
||
config["components"] = orig_pipeline
|
||
if orig_pretraining is not None:
|
||
filled["pretraining"] = orig_pretraining
|
||
config["pretraining"] = orig_pretraining
|
||
resolved_nlp = registry.resolve(
|
||
filled["nlp"], validate=validate, schema=ConfigSchemaNlp
|
||
)
|
||
create_tokenizer = resolved_nlp["tokenizer"]
|
||
before_creation = resolved_nlp["before_creation"]
|
||
after_creation = resolved_nlp["after_creation"]
|
||
after_pipeline_creation = resolved_nlp["after_pipeline_creation"]
|
||
lang_cls = cls
|
||
if before_creation is not None:
|
||
lang_cls = before_creation(cls)
|
||
if (
|
||
not isinstance(lang_cls, type)
|
||
or not issubclass(lang_cls, cls)
|
||
or lang_cls is not cls
|
||
):
|
||
raise ValueError(Errors.E943.format(value=type(lang_cls)))
|
||
|
||
# Warn about require_gpu usage in jupyter notebook
|
||
warn_if_jupyter_cupy()
|
||
|
||
# Note that we don't load vectors here, instead they get loaded explicitly
|
||
# inside stuff like the spacy train function. If we loaded them here,
|
||
# then we would load them twice at runtime: once when we make from config,
|
||
# and then again when we load from disk.
|
||
nlp = lang_cls(vocab=vocab, create_tokenizer=create_tokenizer, meta=meta)
|
||
if after_creation is not None:
|
||
nlp = after_creation(nlp)
|
||
if not isinstance(nlp, cls):
|
||
raise ValueError(Errors.E942.format(name="creation", value=type(nlp)))
|
||
# To create the components we need to use the final interpolated config
|
||
# so all values are available (if component configs use variables).
|
||
# Later we replace the component config with the raw config again.
|
||
interpolated = filled.interpolate() if not filled.is_interpolated else filled
|
||
pipeline = interpolated.get("components", {})
|
||
sourced = util.get_sourced_components(interpolated)
|
||
# If components are loaded from a source (existing models), we cache
|
||
# them here so they're only loaded once
|
||
source_nlps = {}
|
||
source_nlp_vectors_hashes = {}
|
||
vocab_b = None
|
||
for pipe_name in config["nlp"]["pipeline"]:
|
||
if pipe_name not in pipeline:
|
||
opts = ", ".join(pipeline.keys())
|
||
raise ValueError(Errors.E956.format(name=pipe_name, opts=opts))
|
||
pipe_cfg = util.copy_config(pipeline[pipe_name])
|
||
raw_config = Config(filled["components"][pipe_name])
|
||
if pipe_name not in exclude:
|
||
if "factory" not in pipe_cfg and "source" not in pipe_cfg:
|
||
err = Errors.E984.format(name=pipe_name, config=pipe_cfg)
|
||
raise ValueError(err)
|
||
if "factory" in pipe_cfg:
|
||
factory = pipe_cfg.pop("factory")
|
||
# The pipe name (key in the config) here is the unique name
|
||
# of the component, not necessarily the factory
|
||
nlp.add_pipe(
|
||
factory,
|
||
name=pipe_name,
|
||
config=pipe_cfg,
|
||
validate=validate,
|
||
raw_config=raw_config,
|
||
)
|
||
else:
|
||
# We need the sourced components to reference the same
|
||
# vocab without modifying the current vocab state **AND**
|
||
# we still want to load the source model vectors to perform
|
||
# the vectors check. Since the source vectors clobber the
|
||
# current ones, we save the original vocab state and
|
||
# restore after this loop. Existing strings are preserved
|
||
# during deserialization, so they do not need any
|
||
# additional handling.
|
||
if vocab_b is None:
|
||
vocab_b = nlp.vocab.to_bytes(exclude=["lookups", "strings"])
|
||
model = pipe_cfg["source"]
|
||
if model not in source_nlps:
|
||
# Load with the same vocab, adding any strings
|
||
source_nlps[model] = util.load_model(
|
||
model, vocab=nlp.vocab, exclude=["lookups"]
|
||
)
|
||
source_name = pipe_cfg.get("component", pipe_name)
|
||
listeners_replaced = False
|
||
if "replace_listeners" in pipe_cfg:
|
||
for name, proc in source_nlps[model].pipeline:
|
||
if source_name in getattr(proc, "listening_components", []):
|
||
source_nlps[model].replace_listeners(
|
||
name, source_name, pipe_cfg["replace_listeners"]
|
||
)
|
||
listeners_replaced = True
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings("ignore", message="\\[W113\\]")
|
||
nlp.add_pipe(
|
||
source_name, source=source_nlps[model], name=pipe_name
|
||
)
|
||
if model not in source_nlp_vectors_hashes:
|
||
source_nlp_vectors_hashes[model] = hash(
|
||
source_nlps[model].vocab.vectors.to_bytes()
|
||
)
|
||
if "_sourced_vectors_hashes" not in nlp.meta:
|
||
nlp.meta["_sourced_vectors_hashes"] = {}
|
||
nlp.meta["_sourced_vectors_hashes"][
|
||
pipe_name
|
||
] = source_nlp_vectors_hashes[model]
|
||
# Delete from cache if listeners were replaced
|
||
if listeners_replaced:
|
||
del source_nlps[model]
|
||
# Restore the original vocab after sourcing if necessary
|
||
if vocab_b is not None:
|
||
nlp.vocab.from_bytes(vocab_b)
|
||
disabled_pipes = [*config["nlp"]["disabled"], *disable]
|
||
nlp._disabled = set(p for p in disabled_pipes if p not in exclude)
|
||
nlp.batch_size = config["nlp"]["batch_size"]
|
||
nlp.config = filled if auto_fill else config
|
||
if after_pipeline_creation is not None:
|
||
nlp = after_pipeline_creation(nlp)
|
||
if not isinstance(nlp, cls):
|
||
raise ValueError(
|
||
Errors.E942.format(name="pipeline_creation", value=type(nlp))
|
||
)
|
||
# Detect components with listeners that are not frozen consistently
|
||
for name, proc in nlp.pipeline:
|
||
if isinstance(proc, ty.ListenedToComponent):
|
||
# Remove listeners not in the pipeline
|
||
listener_names = proc.listening_components
|
||
unused_listener_names = [
|
||
ll for ll in listener_names if ll not in nlp.pipe_names
|
||
]
|
||
for listener_name in unused_listener_names:
|
||
for listener in proc.listener_map.get(listener_name, []):
|
||
proc.remove_listener(listener, listener_name)
|
||
|
||
for listener_name in proc.listening_components:
|
||
# e.g. tok2vec/transformer
|
||
# If it's a component sourced from another pipeline, we check if
|
||
# the tok2vec listeners should be replaced with standalone tok2vec
|
||
# models (e.g. so component can be frozen without its performance
|
||
# degrading when other components/tok2vec are updated)
|
||
paths = sourced.get(listener_name, {}).get("replace_listeners", [])
|
||
if paths:
|
||
nlp.replace_listeners(name, listener_name, paths)
|
||
return nlp
|
||
|
||
def replace_listeners(
|
||
self,
|
||
tok2vec_name: str,
|
||
pipe_name: str,
|
||
listeners: Iterable[str],
|
||
) -> None:
|
||
"""Find listener layers (connecting to a token-to-vector embedding
|
||
component) of a given pipeline component model and replace
|
||
them with a standalone copy of the token-to-vector layer. This can be
|
||
useful when training a pipeline with components sourced from an existing
|
||
pipeline: if multiple components (e.g. tagger, parser, NER) listen to
|
||
the same tok2vec component, but some of them are frozen and not updated,
|
||
their performance may degrade significally as the tok2vec component is
|
||
updated with new data. To prevent this, listeners can be replaced with
|
||
a standalone tok2vec layer that is owned by the component and doesn't
|
||
change if the component isn't updated.
|
||
|
||
tok2vec_name (str): Name of the token-to-vector component, typically
|
||
"tok2vec" or "transformer".
|
||
pipe_name (str): Name of pipeline component to replace listeners for.
|
||
listeners (Iterable[str]): The paths to the listeners, relative to the
|
||
component config, e.g. ["model.tok2vec"]. Typically, implementations
|
||
will only connect to one tok2vec component, [model.tok2vec], but in
|
||
theory, custom models can use multiple listeners. The value here can
|
||
either be an empty list to not replace any listeners, or a complete
|
||
(!) list of the paths to all listener layers used by the model.
|
||
|
||
DOCS: https://spacy.io/api/language#replace_listeners
|
||
"""
|
||
if tok2vec_name not in self.pipe_names:
|
||
err = Errors.E889.format(
|
||
tok2vec=tok2vec_name,
|
||
name=pipe_name,
|
||
unknown=tok2vec_name,
|
||
opts=", ".join(self.pipe_names),
|
||
)
|
||
raise ValueError(err)
|
||
if pipe_name not in self.pipe_names:
|
||
err = Errors.E889.format(
|
||
tok2vec=tok2vec_name,
|
||
name=pipe_name,
|
||
unknown=pipe_name,
|
||
opts=", ".join(self.pipe_names),
|
||
)
|
||
raise ValueError(err)
|
||
tok2vec = self.get_pipe(tok2vec_name)
|
||
tok2vec_cfg = self.get_pipe_config(tok2vec_name)
|
||
if not isinstance(tok2vec, ty.ListenedToComponent):
|
||
raise ValueError(Errors.E888.format(name=tok2vec_name, pipe=type(tok2vec)))
|
||
tok2vec_model = tok2vec.model
|
||
pipe_listeners = tok2vec.listener_map.get(pipe_name, [])
|
||
pipe = self.get_pipe(pipe_name)
|
||
pipe_cfg = self._pipe_configs[pipe_name]
|
||
if listeners:
|
||
util.logger.debug(f"Replacing listeners of component '{pipe_name}'")
|
||
if len(list(listeners)) != len(pipe_listeners):
|
||
# The number of listeners defined in the component model doesn't
|
||
# match the listeners to replace, so we won't be able to update
|
||
# the nodes and generate a matching config
|
||
err = Errors.E887.format(
|
||
name=pipe_name,
|
||
tok2vec=tok2vec_name,
|
||
paths=listeners,
|
||
n_listeners=len(pipe_listeners),
|
||
)
|
||
raise ValueError(err)
|
||
# Update the config accordingly by copying the tok2vec model to all
|
||
# sections defined in the listener paths
|
||
for listener_path in listeners:
|
||
# Check if the path actually exists in the config
|
||
try:
|
||
util.dot_to_object(pipe_cfg, listener_path)
|
||
except KeyError:
|
||
err = Errors.E886.format(
|
||
name=pipe_name, tok2vec=tok2vec_name, path=listener_path
|
||
)
|
||
raise ValueError(err)
|
||
new_config = tok2vec_cfg["model"]
|
||
if "replace_listener_cfg" in tok2vec_model.attrs:
|
||
replace_func = tok2vec_model.attrs["replace_listener_cfg"]
|
||
new_config = replace_func(
|
||
tok2vec_cfg["model"], pipe_cfg["model"]["tok2vec"]
|
||
)
|
||
util.set_dot_to_object(pipe_cfg, listener_path, new_config)
|
||
# Go over the listener layers and replace them
|
||
for listener in pipe_listeners:
|
||
new_model = tok2vec_model.copy()
|
||
if "replace_listener" in tok2vec_model.attrs:
|
||
new_model = tok2vec_model.attrs["replace_listener"](new_model)
|
||
util.replace_model_node(pipe.model, listener, new_model) # type: ignore[attr-defined]
|
||
tok2vec.remove_listener(listener, pipe_name)
|
||
|
||
def to_disk(
|
||
self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
|
||
) -> None:
|
||
"""Save the current state to a directory. If a model is loaded, this
|
||
will include the model.
|
||
|
||
path (str / Path): Path to a directory, which will be created if
|
||
it doesn't exist.
|
||
exclude (Iterable[str]): Names of components or serialization fields to exclude.
|
||
|
||
DOCS: https://spacy.io/api/language#to_disk
|
||
"""
|
||
path = util.ensure_path(path)
|
||
serializers = {}
|
||
serializers["tokenizer"] = lambda p: self.tokenizer.to_disk( # type: ignore[union-attr]
|
||
p, exclude=["vocab"]
|
||
)
|
||
serializers["meta.json"] = lambda p: srsly.write_json(p, self.meta)
|
||
serializers["config.cfg"] = lambda p: self.config.to_disk(p)
|
||
for name, proc in self._components:
|
||
if name in exclude:
|
||
continue
|
||
if not hasattr(proc, "to_disk"):
|
||
continue
|
||
serializers[name] = lambda p, proc=proc: proc.to_disk(p, exclude=["vocab"]) # type: ignore[misc]
|
||
serializers["vocab"] = lambda p: self.vocab.to_disk(p, exclude=exclude)
|
||
util.to_disk(path, serializers, exclude)
|
||
|
||
def from_disk(
|
||
self,
|
||
path: Union[str, Path],
|
||
*,
|
||
exclude: Iterable[str] = SimpleFrozenList(),
|
||
overrides: Dict[str, Any] = SimpleFrozenDict(),
|
||
) -> "Language":
|
||
"""Loads state from a directory. Modifies the object in place and
|
||
returns it. If the saved `Language` object contains a model, the
|
||
model will be loaded.
|
||
|
||
path (str / Path): A path to a directory.
|
||
exclude (Iterable[str]): Names of components or serialization fields to exclude.
|
||
RETURNS (Language): The modified `Language` object.
|
||
|
||
DOCS: https://spacy.io/api/language#from_disk
|
||
"""
|
||
|
||
def deserialize_meta(path: Path) -> None:
|
||
if path.exists():
|
||
data = srsly.read_json(path)
|
||
self.meta.update(data)
|
||
# self.meta always overrides meta["vectors"] with the metadata
|
||
# from self.vocab.vectors, so set the name directly
|
||
self.vocab.vectors.name = data.get("vectors", {}).get("name")
|
||
|
||
def deserialize_vocab(path: Path) -> None:
|
||
if path.exists():
|
||
self.vocab.from_disk(path, exclude=exclude)
|
||
|
||
path = util.ensure_path(path)
|
||
deserializers = {}
|
||
if Path(path / "config.cfg").exists(): # type: ignore[operator]
|
||
deserializers["config.cfg"] = lambda p: self.config.from_disk(
|
||
p, interpolate=False, overrides=overrides
|
||
)
|
||
deserializers["meta.json"] = deserialize_meta # type: ignore[assignment]
|
||
deserializers["vocab"] = deserialize_vocab # type: ignore[assignment]
|
||
deserializers["tokenizer"] = lambda p: self.tokenizer.from_disk( # type: ignore[union-attr]
|
||
p, exclude=["vocab"]
|
||
)
|
||
for name, proc in self._components:
|
||
if name in exclude:
|
||
continue
|
||
if not hasattr(proc, "from_disk"):
|
||
continue
|
||
deserializers[name] = lambda p, proc=proc: proc.from_disk( # type: ignore[misc]
|
||
p, exclude=["vocab"]
|
||
)
|
||
if not (path / "vocab").exists() and "vocab" not in exclude: # type: ignore[operator]
|
||
# Convert to list here in case exclude is (default) tuple
|
||
exclude = list(exclude) + ["vocab"]
|
||
util.from_disk(path, deserializers, exclude) # type: ignore[arg-type]
|
||
self._path = path # type: ignore[assignment]
|
||
self._link_components()
|
||
return self
|
||
|
||
def to_bytes(self, *, exclude: Iterable[str] = SimpleFrozenList()) -> bytes:
|
||
"""Serialize the current state to a binary string.
|
||
|
||
exclude (Iterable[str]): Names of components or serialization fields to exclude.
|
||
RETURNS (bytes): The serialized form of the `Language` object.
|
||
|
||
DOCS: https://spacy.io/api/language#to_bytes
|
||
"""
|
||
serializers: Dict[str, Callable[[], bytes]] = {}
|
||
serializers["vocab"] = lambda: self.vocab.to_bytes(exclude=exclude)
|
||
serializers["tokenizer"] = lambda: self.tokenizer.to_bytes(exclude=["vocab"]) # type: ignore[union-attr]
|
||
serializers["meta.json"] = lambda: srsly.json_dumps(self.meta)
|
||
serializers["config.cfg"] = lambda: self.config.to_bytes()
|
||
for name, proc in self._components:
|
||
if name in exclude:
|
||
continue
|
||
if not hasattr(proc, "to_bytes"):
|
||
continue
|
||
serializers[name] = lambda proc=proc: proc.to_bytes(exclude=["vocab"]) # type: ignore[misc]
|
||
return util.to_bytes(serializers, exclude)
|
||
|
||
def from_bytes(
|
||
self, bytes_data: bytes, *, exclude: Iterable[str] = SimpleFrozenList()
|
||
) -> "Language":
|
||
"""Load state from a binary string.
|
||
|
||
bytes_data (bytes): The data to load from.
|
||
exclude (Iterable[str]): Names of components or serialization fields to exclude.
|
||
RETURNS (Language): The `Language` object.
|
||
|
||
DOCS: https://spacy.io/api/language#from_bytes
|
||
"""
|
||
|
||
def deserialize_meta(b):
|
||
data = srsly.json_loads(b)
|
||
self.meta.update(data)
|
||
# self.meta always overrides meta["vectors"] with the metadata
|
||
# from self.vocab.vectors, so set the name directly
|
||
self.vocab.vectors.name = data.get("vectors", {}).get("name")
|
||
|
||
deserializers: Dict[str, Callable[[bytes], Any]] = {}
|
||
deserializers["config.cfg"] = lambda b: self.config.from_bytes(
|
||
b, interpolate=False
|
||
)
|
||
deserializers["meta.json"] = deserialize_meta
|
||
deserializers["vocab"] = lambda b: self.vocab.from_bytes(b, exclude=exclude)
|
||
deserializers["tokenizer"] = lambda b: self.tokenizer.from_bytes( # type: ignore[union-attr]
|
||
b, exclude=["vocab"]
|
||
)
|
||
for name, proc in self._components:
|
||
if name in exclude:
|
||
continue
|
||
if not hasattr(proc, "from_bytes"):
|
||
continue
|
||
deserializers[name] = lambda b, proc=proc: proc.from_bytes( # type: ignore[misc]
|
||
b, exclude=["vocab"]
|
||
)
|
||
util.from_bytes(bytes_data, deserializers, exclude)
|
||
self._link_components()
|
||
return self
|
||
|
||
|
||
@dataclass
|
||
class FactoryMeta:
|
||
"""Dataclass containing information about a component and its defaults
|
||
provided by the @Language.component or @Language.factory decorator. It's
|
||
created whenever a component is defined and stored on the Language class for
|
||
each component instance and factory instance.
|
||
"""
|
||
|
||
factory: str
|
||
default_config: Optional[Dict[str, Any]] = None # noqa: E704
|
||
assigns: Iterable[str] = tuple()
|
||
requires: Iterable[str] = tuple()
|
||
retokenizes: bool = False
|
||
scores: Iterable[str] = tuple()
|
||
default_score_weights: Optional[Dict[str, Optional[float]]] = None # noqa: E704
|
||
|
||
|
||
class DisabledPipes(list):
|
||
"""Manager for temporary pipeline disabling."""
|
||
|
||
def __init__(self, nlp: Language, names: List[str]) -> None:
|
||
self.nlp = nlp
|
||
self.names = names
|
||
for name in self.names:
|
||
self.nlp.disable_pipe(name)
|
||
list.__init__(self)
|
||
self.extend(self.names)
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, *args):
|
||
self.restore()
|
||
|
||
def restore(self) -> None:
|
||
"""Restore the pipeline to its state when DisabledPipes was created."""
|
||
for name in self.names:
|
||
if name not in self.nlp.component_names:
|
||
raise ValueError(Errors.E008.format(name=name))
|
||
self.nlp.enable_pipe(name)
|
||
self[:] = []
|
||
|
||
|
||
def _copy_examples(examples: Iterable[Example]) -> List[Example]:
|
||
"""Make a copy of a batch of examples, copying the predicted Doc as well.
|
||
This is used in contexts where we need to take ownership of the examples
|
||
so that they can be mutated, for instance during Language.evaluate and
|
||
Language.update.
|
||
"""
|
||
return [Example(eg.x.copy(), eg.y) for eg in examples]
|
||
|
||
|
||
def _apply_pipes(
|
||
ensure_doc: Callable[[Union[str, Doc]], Doc],
|
||
pipes: Iterable[Callable[..., Iterator[Doc]]],
|
||
receiver,
|
||
sender,
|
||
underscore_state: Tuple[dict, dict, dict],
|
||
) -> None:
|
||
"""Worker for Language.pipe
|
||
|
||
ensure_doc (Callable[[Union[str, Doc]], Doc]): Function to create Doc from text
|
||
or raise an error if the input is neither a Doc nor a string.
|
||
pipes (Iterable[Pipe]): The components to apply.
|
||
receiver (multiprocessing.Connection): Pipe to receive text. Usually
|
||
created by `multiprocessing.Pipe()`
|
||
sender (multiprocessing.Connection): Pipe to send doc. Usually created by
|
||
`multiprocessing.Pipe()`
|
||
underscore_state (Tuple[dict, dict, dict]): The data in the Underscore class
|
||
of the parent.
|
||
"""
|
||
Underscore.load_state(underscore_state)
|
||
while True:
|
||
try:
|
||
texts = receiver.get()
|
||
docs = (ensure_doc(text) for text in texts)
|
||
for pipe in pipes:
|
||
docs = pipe(docs) # type: ignore[arg-type, assignment]
|
||
# Connection does not accept unpickable objects, so send list.
|
||
byte_docs = [(doc.to_bytes(), None) for doc in docs]
|
||
padding = [(None, None)] * (len(texts) - len(byte_docs))
|
||
sender.send(byte_docs + padding) # type: ignore[operator]
|
||
except Exception:
|
||
error_msg = [(None, srsly.msgpack_dumps(traceback.format_exc()))]
|
||
padding = [(None, None)] * (len(texts) - 1)
|
||
sender.send(error_msg + padding)
|
||
|
||
|
||
class _Sender:
|
||
"""Util for sending data to multiprocessing workers in Language.pipe"""
|
||
|
||
def __init__(
|
||
self, data: Iterable[Any], queues: List[mp.Queue], chunk_size: int
|
||
) -> None:
|
||
self.data = iter(data)
|
||
self.queues = iter(cycle(queues))
|
||
self.chunk_size = chunk_size
|
||
self.count = 0
|
||
|
||
def send(self) -> None:
|
||
"""Send chunk_size items from self.data to channels."""
|
||
for item, q in itertools.islice(
|
||
zip(self.data, cycle(self.queues)), self.chunk_size
|
||
):
|
||
# cycle channels so that distribute the texts evenly
|
||
q.put(item)
|
||
|
||
def step(self) -> None:
|
||
"""Tell sender that comsumed one item. Data is sent to the workers after
|
||
every chunk_size calls.
|
||
"""
|
||
self.count += 1
|
||
if self.count >= self.chunk_size:
|
||
self.count = 0
|
||
self.send()
|