from itertools import islice from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple from thinc.api import Config, Model, Optimizer, set_dropout_rate from thinc.types import Floats2d from ..errors import Errors from ..language import Language from ..tokens import Doc from ..training import Example, validate_examples, validate_get_examples from ..vocab import Vocab from .trainable_pipe import TrainablePipe default_model_config = """ [model] @architectures = "spacy.HashEmbedCNN.v2" pretrained_vectors = null width = 96 depth = 4 embed_size = 2000 window_size = 1 maxout_pieces = 3 subword_features = true """ DEFAULT_TOK2VEC_MODEL = Config().from_str(default_model_config)["model"] @Language.factory( "tok2vec", assigns=["doc.tensor"], default_config={"model": DEFAULT_TOK2VEC_MODEL} ) def make_tok2vec(nlp: Language, name: str, model: Model) -> "Tok2Vec": return Tok2Vec(nlp.vocab, model, name) class Tok2Vec(TrainablePipe): """Apply a "token-to-vector" model and set its outputs in the doc.tensor attribute. This is mostly useful to share a single subnetwork between multiple components, e.g. to have one embedding and CNN network shared between a parser, tagger and NER. In order to use the `Tok2Vec` predictions, subsequent components should use the `Tok2VecListener` layer as the tok2vec subnetwork of their model. This layer will read data from the `doc.tensor` attribute during prediction. During training, the `Tok2Vec` component will save its prediction and backprop callback for each batch, so that the subsequent components can backpropagate to the shared weights. This implementation is used because it allows us to avoid relying on object identity within the models to achieve the parameter sharing. """ def __init__(self, vocab: Vocab, model: Model, name: str = "tok2vec") -> None: """Initialize a tok2vec component. vocab (Vocab): The shared vocabulary. model (thinc.api.Model[List[Doc], List[Floats2d]]): The Thinc Model powering the pipeline component. It should take a list of Doc objects as input, and output a list of 2d float arrays. name (str): The component instance name. DOCS: https://spacy.io/api/tok2vec#init """ self.vocab = vocab self.model = model self.name = name self.listener_map: Dict[str, List["Tok2VecListener"]] = {} self.cfg: Dict[str, Any] = {} @property def listeners(self) -> List["Tok2VecListener"]: """RETURNS (List[Tok2VecListener]): The listener models listening to this component. Usually internals. """ return [m for c in self.listening_components for m in self.listener_map[c]] @property def listening_components(self) -> List[str]: """RETURNS (List[str]): The downstream components listening to this component. Usually internals. """ return list(self.listener_map.keys()) def add_listener(self, listener: "Tok2VecListener", component_name: str) -> None: """Add a listener for a downstream component. Usually internals.""" self.listener_map.setdefault(component_name, []) if listener not in self.listener_map[component_name]: self.listener_map[component_name].append(listener) def remove_listener(self, listener: "Tok2VecListener", component_name: str) -> bool: """Remove a listener for a downstream component. Usually internals.""" if component_name in self.listener_map: if listener in self.listener_map[component_name]: self.listener_map[component_name].remove(listener) # If no listeners are left, remove entry if not self.listener_map[component_name]: del self.listener_map[component_name] return True return False def find_listeners(self, component) -> None: """Walk over a model of a processing component, looking for layers that are Tok2vecListener subclasses that have an upstream_name that matches this component. Listeners can also set their upstream_name attribute to the wildcard string '*' to match any `Tok2Vec`. You're unlikely to ever need multiple `Tok2Vec` components, so it's fine to leave your listeners upstream_name on '*'. """ names = ("*", self.name) if isinstance(getattr(component, "model", None), Model): for node in component.model.walk(): if isinstance(node, Tok2VecListener) and node.upstream_name in names: self.add_listener(node, component.name) def predict(self, docs: Iterable[Doc]): """Apply the pipeline's model to a batch of docs, without modifying them. Returns a single tensor for a batch of documents. docs (Iterable[Doc]): The documents to predict. RETURNS: Vector representations for each token in the documents. DOCS: https://spacy.io/api/tok2vec#predict """ if not any(len(doc) for doc in docs): # Handle cases where there are no tokens in any docs. width = self.model.get_dim("nO") return [self.model.ops.alloc((0, width)) for doc in docs] tokvecs = self.model.predict(docs) return tokvecs def set_annotations(self, docs: Sequence[Doc], tokvecses) -> None: """Modify a batch of documents, using pre-computed scores. docs (Iterable[Doc]): The documents to modify. tokvecses: The tensors to set, produced by Tok2Vec.predict. DOCS: https://spacy.io/api/tok2vec#set_annotations """ for doc, tokvecs in zip(docs, tokvecses): assert tokvecs.shape[0] == len(doc) doc.tensor = tokvecs def update( self, examples: Iterable[Example], *, drop: float = 0.0, sgd: Optional[Optimizer] = None, losses: Optional[Dict[str, float]] = None, ): """Learn from a batch of documents and gold-standard information, updating the pipe's model. examples (Iterable[Example]): A batch of Example objects. drop (float): The dropout rate. sgd (thinc.api.Optimizer): The optimizer. losses (Dict[str, float]): Optional record of the loss during training. Updated using the component name as the key. RETURNS (Dict[str, float]): The updated losses dictionary. DOCS: https://spacy.io/api/tok2vec#update """ validate_examples(examples, "Tok2Vec.update") docs = [eg.predicted for eg in examples] return self._update_with_docs(docs, drop=drop, sgd=sgd, losses=losses) def get_loss(self, examples, scores) -> None: pass def initialize( self, get_examples: Callable[[], Iterable[Example]], *, nlp: Optional[Language] = None, ): """Initialize the pipe for training, using a representative set of data examples. get_examples (Callable[[], Iterable[Example]]): Function that returns a representative sample of gold-standard Example objects. nlp (Language): The current nlp object the component is part of. DOCS: https://spacy.io/api/tok2vec#initialize """ validate_get_examples(get_examples, "Tok2Vec.initialize") doc_sample = [] for example in islice(get_examples(), 10): doc_sample.append(example.x) assert doc_sample, Errors.E923.format(name=self.name) self.model.initialize(X=doc_sample) def add_label(self, label): raise NotImplementedError def distill( self, teacher_pipe: Optional["TrainablePipe"], examples: Iterable["Example"], *, drop: float = 0.0, sgd: Optional[Optimizer] = None, losses: Optional[Dict[str, float]] = None, ) -> Dict[str, float]: """Performs an update of the student pipe's model using the student's distillation examples and sets the annotations of the teacher's distillation examples using the teacher pipe. teacher_pipe (Optional[TrainablePipe]): The teacher pipe to use for prediction. examples (Iterable[Example]): Distillation examples. The reference (teacher) and predicted (student) docs must have the same number of tokens and the same orthography. drop (float): dropout rate. sgd (Optional[Optimizer]): An optimizer. Will be created via create_optimizer if not set. losses (Optional[Dict[str, float]]): Optional record of loss during distillation. RETURNS: The updated losses dictionary. DOCS: https://spacy.io/api/tok2vec#distill """ # By default we require a teacher pipe, but there are downstream # implementations that don't require a pipe. if teacher_pipe is None: raise ValueError(Errors.E4002.format(name=self.name)) teacher_docs = [eg.reference for eg in examples] student_docs = [eg.predicted for eg in examples] teacher_preds = teacher_pipe.predict(teacher_docs) teacher_pipe.set_annotations(teacher_docs, teacher_preds) return self._update_with_docs(student_docs, drop=drop, sgd=sgd, losses=losses) def _update_with_docs( self, docs: Iterable[Doc], *, drop: float = 0.0, sgd: Optional[Optimizer] = None, losses: Optional[Dict[str, float]] = None, ): if losses is None: losses = {} losses.setdefault(self.name, 0.0) set_dropout_rate(self.model, drop) tokvecs, accumulate_gradient, backprop = self._create_backprops( docs, losses, sgd=sgd ) batch_id = Tok2VecListener.get_batch_id(docs) for listener in self.listeners[:-1]: listener.receive(batch_id, tokvecs, accumulate_gradient) if self.listeners: self.listeners[-1].receive(batch_id, tokvecs, backprop) return losses def _create_backprops( self, docs: Iterable[Doc], losses: Dict[str, float], *, sgd: Optional[Optimizer] = None, ) -> Tuple[Floats2d, Callable, Callable]: tokvecs, bp_tokvecs = self.model.begin_update(docs) d_tokvecs = [self.model.ops.alloc2f(*t2v.shape) for t2v in tokvecs] def accumulate_gradient(one_d_tokvecs): """Accumulate tok2vec loss and gradient. This is passed as a callback to all but the last listener. Only the last one does the backprop. """ nonlocal d_tokvecs for i in range(len(one_d_tokvecs)): d_tokvecs[i] += one_d_tokvecs[i] losses[self.name] += float((one_d_tokvecs[i] ** 2).sum()) return [self.model.ops.alloc2f(*t2v.shape) for t2v in tokvecs] def backprop(one_d_tokvecs): """Callback to actually do the backprop. Passed to last listener.""" accumulate_gradient(one_d_tokvecs) d_docs = bp_tokvecs(d_tokvecs) if sgd is not None: self.finish_update(sgd) return d_docs return tokvecs, accumulate_gradient, backprop class Tok2VecListener(Model): """A layer that gets fed its answers from an upstream connection, for instance from a component earlier in the pipeline. The Tok2VecListener layer is used as a sublayer within a component such as a parser, NER or text categorizer. Usually you'll have multiple listeners connecting to a single upstream Tok2Vec component, that's earlier in the pipeline. The Tok2VecListener layers act as proxies, passing the predictions from the Tok2Vec component into downstream components, and communicating gradients back upstream. """ name = "tok2vec-listener" def __init__(self, upstream_name: str, width: int) -> None: """ upstream_name (str): A string to identify the 'upstream' Tok2Vec component to communicate with. The upstream name should either be the wildcard string '*', or the name of the `Tok2Vec` component. You'll almost never have multiple upstream Tok2Vec components, so the wildcard string will almost always be fine. width (int): The width of the vectors produced by the upstream tok2vec component. """ Model.__init__(self, name=self.name, forward=forward, dims={"nO": width}) self.upstream_name = upstream_name self._batch_id: Optional[int] = None self._outputs = None self._backprop = None @classmethod def get_batch_id(cls, inputs: Iterable[Doc]) -> int: """Calculate a content-sensitive hash of the batch of documents, to check whether the next batch of documents is unexpected. """ return sum(sum(token.orth for token in doc) for doc in inputs) def receive(self, batch_id: int, outputs, backprop) -> None: """Store a batch of training predictions and a backprop callback. The predictions and callback are produced by the upstream Tok2Vec component, and later will be used when the listener's component's model is called. """ self._batch_id = batch_id self._outputs = outputs self._backprop = backprop def verify_inputs(self, inputs) -> bool: """Check that the batch of Doc objects matches the ones we have a prediction for. """ if self._batch_id is None and self._outputs is None: raise ValueError(Errors.E954) else: batch_id = self.get_batch_id(inputs) if batch_id != self._batch_id: raise ValueError(Errors.E953.format(id1=batch_id, id2=self._batch_id)) else: return True def forward(model: Tok2VecListener, inputs, is_train: bool): """Supply the outputs from the upstream Tok2Vec component.""" if is_train: # This might occur during training when the tok2vec layer is frozen / hasn't been updated. # In that case, it should be set to "annotating" so we can retrieve the embeddings from the doc. if model._batch_id is None: outputs = [] for doc in inputs: if doc.tensor.size == 0: raise ValueError(Errors.E203.format(name="tok2vec")) else: outputs.append(doc.tensor) return outputs, _empty_backprop else: model.verify_inputs(inputs) return model._outputs, model._backprop else: # This is pretty grim, but it's hard to do better :(. # It's hard to avoid relying on the doc.tensor attribute, because the # pipeline components can batch the data differently during prediction. # That doesn't happen in update, where the nlp object works on batches # of data. # When the components batch differently, we don't receive a matching # prediction from the upstream, so we can't predict. outputs = [] width = model.get_dim("nO") for doc in inputs: if doc.tensor.size == 0: # But we do need to do *something* if the tensor hasn't been set. # The compromise is to at least return data of the right shape, # so the output is valid. outputs.append(model.ops.alloc2f(len(doc), width)) else: outputs.append(doc.tensor) return outputs, _empty_backprop def _empty_backprop(dX): # for pickling return []