# cython: infer_types=True, profile=True, binding=True from typing import Optional import numpy from thinc.api import CosineDistance, to_categorical, to_categorical, Model, Config from thinc.api import set_dropout_rate from ..tokens.doc cimport Doc from .pipe import Pipe from .tagger import Tagger from ..language import Language from ..syntax import nonproj from ..attrs import POS, ID from ..errors import Errors default_model_config = """ [model] @architectures = "spacy.MultiTask.v1" maxout_pieces = 3 token_vector_width = 96 [model.tok2vec] @architectures = "spacy.HashEmbedCNN.v1" pretrained_vectors = null width = 96 depth = 4 embed_size = 2000 window_size = 1 maxout_pieces = 2 subword_features = true dropout = null """ DEFAULT_MT_MODEL = Config().from_str(default_model_config)["model"] @Language.factory( "nn_labeller", default_config={"labels": None, "target": "dep_tag_offset", "model": DEFAULT_MT_MODEL} ) def make_nn_labeller(nlp: Language, name: str, model: Model, labels: Optional[dict], target: str): return MultitaskObjective(nlp.vocab, model, name) class MultitaskObjective(Tagger): """Experimental: Assist training of a parser or tagger, by training a side-objective. """ def __init__(self, vocab, model, name="nn_labeller", *, labels, target): self.vocab = vocab self.model = model self.name = name if target == "dep": self.make_label = self.make_dep elif target == "tag": self.make_label = self.make_tag elif target == "ent": self.make_label = self.make_ent elif target == "dep_tag_offset": self.make_label = self.make_dep_tag_offset elif target == "ent_tag": self.make_label = self.make_ent_tag elif target == "sent_start": self.make_label = self.make_sent_start elif hasattr(target, "__call__"): self.make_label = target else: raise ValueError(Errors.E016) cfg = {"labels": labels or {}, "target": target} self.cfg = dict(cfg) @property def labels(self): return self.cfg.setdefault("labels", {}) @labels.setter def labels(self, value): self.cfg["labels"] = value def set_annotations(self, docs, dep_ids): pass def begin_training(self, get_examples=lambda: [], pipeline=None, sgd=None): gold_examples = nonproj.preprocess_training_data(get_examples()) # for raw_text, doc_annot in gold_tuples: for example in gold_examples: for token in example.y: label = self.make_label(token) if label is not None and label not in self.labels: self.labels[label] = len(self.labels) self.model.initialize() if sgd is None: sgd = self.create_optimizer() return sgd def predict(self, docs): tokvecs = self.model.get_ref("tok2vec")(docs) scores = self.model.get_ref("softmax")(tokvecs) return tokvecs, scores def get_loss(self, examples, scores): cdef int idx = 0 correct = numpy.zeros((scores.shape[0],), dtype="i") guesses = scores.argmax(axis=1) docs = [eg.predicted for eg in examples] for i, eg in enumerate(examples): # Handles alignment for tokenization differences doc_annots = eg.get_aligned() # TODO for j in range(len(eg.predicted)): tok_annots = {key: values[j] for key, values in tok_annots.items()} label = self.make_label(j, tok_annots) if label is None or label not in self.labels: correct[idx] = guesses[idx] else: correct[idx] = self.labels[label] idx += 1 correct = self.model.ops.xp.array(correct, dtype="i") d_scores = scores - to_categorical(correct, n_classes=scores.shape[1]) loss = (d_scores**2).sum() return float(loss), d_scores @staticmethod def make_dep(token): return token.dep_ @staticmethod def make_tag(token): return token.tag_ @staticmethod def make_ent(token): if token.ent_iob_ == "O": return "O" else: return token.ent_iob_ + "-" + token.ent_type_ @staticmethod def make_dep_tag_offset(token): dep = token.dep_ tag = token.tag_ offset = token.head.i - token.i offset = min(offset, 2) offset = max(offset, -2) return f"{dep}-{tag}:{offset}" @staticmethod def make_ent_tag(token): if token.ent_iob_ == "O": ent = "O" else: ent = token.ent_iob_ + "-" + token.ent_type_ tag = token.tag_ return f"{tag}-{ent}" @staticmethod def make_sent_start(token): """A multi-task objective for representing sentence boundaries, using BILU scheme. (O is impossible) """ if token.is_sent_start and token.is_sent_end: return "U-SENT" elif token.is_sent_start: return "B-SENT" else: return "I-SENT" class ClozeMultitask(Pipe): def __init__(self, vocab, model, **cfg): self.vocab = vocab self.model = model self.cfg = cfg self.distance = CosineDistance(ignore_zeros=True, normalize=False) # TODO: in config def set_annotations(self, docs, dep_ids): pass def begin_training(self, get_examples=lambda: [], pipeline=None, sgd=None): self.model.initialize() X = self.model.ops.alloc((5, self.model.get_ref("tok2vec").get_dim("nO"))) self.model.output_layer.begin_training(X) if sgd is None: sgd = self.create_optimizer() return sgd def predict(self, docs): tokvecs = self.model.get_ref("tok2vec")(docs) vectors = self.model.get_ref("output_layer")(tokvecs) return tokvecs, vectors def get_loss(self, examples, vectors, prediction): # The simplest way to implement this would be to vstack the # token.vector values, but that's a bit inefficient, especially on GPU. # Instead we fetch the index into the vectors table for each of our tokens, # and look them up all at once. This prevents data copying. ids = self.model.ops.flatten([eg.predicted.to_array(ID).ravel() for eg in examples]) target = vectors[ids] gradient = self.distance.get_grad(prediction, target) loss = self.distance.get_loss(prediction, target) return loss, gradient def update(self, examples, *, drop=0., set_annotations=False, sgd=None, losses=None): pass def rehearse(self, examples, drop=0., sgd=None, losses=None): if losses is not None and self.name not in losses: losses[self.name] = 0. set_dropout_rate(self.model, drop) try: predictions, bp_predictions = self.model.begin_update([eg.predicted for eg in examples]) except AttributeError: types = set([type(eg) for eg in examples]) raise TypeError(Errors.E978.format(name="ClozeMultitask", method="rehearse", types=types)) loss, d_predictions = self.get_loss(examples, self.vocab.vectors.data, predictions) bp_predictions(d_predictions) if sgd is not None: self.model.finish_update(sgd) if losses is not None: losses[self.name] += loss