mirror of
https://github.com/explosion/spaCy.git
synced 2024-11-14 05:37:03 +03:00
657af5f91f
* 🚨 Ignore all existing Mypy errors * 🏗 Add Mypy check to CI * Add types-mock and types-requests as dev requirements * Add additional type ignore directives * Add types packages to dev-only list in reqs test * Add types-dataclasses for python 3.6 * Add ignore to pretrain * 🏷 Improve type annotation on `run_command` helper The `run_command` helper previously declared that it returned an `Optional[subprocess.CompletedProcess]`, but it isn't actually possible for the function to return `None`. These changes modify the type annotation of the `run_command` helper and remove all now-unnecessary `# type: ignore` directives. * 🔧 Allow variable type redefinition in limited contexts These changes modify how Mypy is configured to allow variables to have their type automatically redefined under certain conditions. The Mypy documentation contains the following example: ```python def process(items: List[str]) -> None: # 'items' has type List[str] items = [item.split() for item in items] # 'items' now has type List[List[str]] ... ``` This configuration change is especially helpful in reducing the number of `# type: ignore` directives needed to handle the common pattern of: * Accepting a filepath as a string * Overwriting the variable using `filepath = ensure_path(filepath)` These changes enable redefinition and remove all `# type: ignore` directives rendered redundant by this change. * 🏷 Add type annotation to converters mapping * 🚨 Fix Mypy error in convert CLI argument verification * 🏷 Improve type annotation on `resolve_dot_names` helper * 🏷 Add type annotations for `Vocab` attributes `strings` and `vectors` * 🏷 Add type annotations for more `Vocab` attributes * 🏷 Add loose type annotation for gold data compilation * 🏷 Improve `_format_labels` type annotation * 🏷 Fix `get_lang_class` type annotation * 🏷 Loosen return type of `Language.evaluate` * 🏷 Don't accept `Scorer` in `handle_scores_per_type` * 🏷 Add `string_to_list` overloads * 🏷 Fix non-Optional command-line options * 🙈 Ignore redefinition of `wandb_logger` in `loggers.py` * ➕ Install `typing_extensions` in Python 3.8+ The `typing_extensions` package states that it should be used when "writing code that must be compatible with multiple Python versions". Since SpaCy needs to support multiple Python versions, it should be used when newer `typing` module members are required. One example of this is `Literal`, which is available starting with Python 3.8. Previously SpaCy tried to import `Literal` from `typing`, falling back to `typing_extensions` if the import failed. However, Mypy doesn't seem to be able to understand what `Literal` means when the initial import means. Therefore, these changes modify how `compat` imports `Literal` by always importing it from `typing_extensions`. These changes also modify how `typing_extensions` is installed, so that it is a requirement for all Python versions, including those greater than or equal to 3.8. * 🏷 Improve type annotation for `Language.pipe` These changes add a missing overload variant to the type signature of `Language.pipe`. Additionally, the type signature is enhanced to allow type checkers to differentiate between the two overload variants based on the `as_tuple` parameter. Fixes #8772 * ➖ Don't install `typing-extensions` in Python 3.8+ After more detailed analysis of how to implement Python version-specific type annotations using SpaCy, it has been determined that by branching on a comparison against `sys.version_info` can be statically analyzed by Mypy well enough to enable us to conditionally use `typing_extensions.Literal`. This means that we no longer need to install `typing_extensions` for Python versions greater than or equal to 3.8! 🎉 These changes revert previous changes installing `typing-extensions` regardless of Python version and modify how we import the `Literal` type to ensure that Mypy treats it properly. * resolve mypy errors for Strict pydantic types * refactor code to avoid missing return statement * fix types of convert CLI command * avoid list-set confustion in debug_data * fix typo and formatting * small fixes to avoid type ignores * fix types in profile CLI command and make it more efficient * type fixes in projects CLI * put one ignore back * type fixes for render * fix render types - the sequel * fix BaseDefault in language definitions * fix type of noun_chunks iterator - yields tuple instead of span * fix types in language-specific modules * 🏷 Expand accepted inputs of `get_string_id` `get_string_id` accepts either a string (in which case it returns its ID) or an ID (in which case it immediately returns the ID). These changes extend the type annotation of `get_string_id` to indicate that it can accept either strings or IDs. * 🏷 Handle override types in `combine_score_weights` The `combine_score_weights` function allows users to pass an `overrides` mapping to override data extracted from the `weights` argument. Since it allows `Optional` dictionary values, the return value may also include `Optional` dictionary values. These changes update the type annotations for `combine_score_weights` to reflect this fact. * 🏷 Fix tokenizer serialization method signatures in `DummyTokenizer` * 🏷 Fix redefinition of `wandb_logger` These changes fix the redefinition of `wandb_logger` by giving a separate name to each `WandbLogger` version. For backwards-compatibility, `spacy.train` still exports `wandb_logger_v3` as `wandb_logger` for now. * more fixes for typing in language * type fixes in model definitions * 🏷 Annotate `_RandomWords.probs` as `NDArray` * 🏷 Annotate `tok2vec` layers to help Mypy * 🐛 Fix `_RandomWords.probs` type annotations for Python 3.6 Also remove an import that I forgot to move to the top of the module 😅 * more fixes for matchers and other pipeline components * quick fix for entity linker * fixing types for spancat, textcat, etc * bugfix for tok2vec * type annotations for scorer * add runtime_checkable for Protocol * type and import fixes in tests * mypy fixes for training utilities * few fixes in util * fix import * 🐵 Remove unused `# type: ignore` directives * 🏷 Annotate `Language._components` * 🏷 Annotate `spacy.pipeline.Pipe` * add doc as property to span.pyi * small fixes and cleanup * explicit type annotations instead of via comment Co-authored-by: Adriane Boyd <adrianeboyd@gmail.com> Co-authored-by: svlandeg <sofie.vanlandeghem@gmail.com> Co-authored-by: svlandeg <svlandeg@github.com>
370 lines
14 KiB
Python
370 lines
14 KiB
Python
from typing import List, Callable, Tuple, Dict, Iterable, Union, Any, IO
|
|
from typing import Optional, TYPE_CHECKING
|
|
from pathlib import Path
|
|
from timeit import default_timer as timer
|
|
from thinc.api import Optimizer, Config, constant, fix_random_seed, set_gpu_allocator
|
|
from wasabi import Printer
|
|
import random
|
|
import sys
|
|
import shutil
|
|
|
|
from .example import Example
|
|
from ..schemas import ConfigSchemaTraining
|
|
from ..errors import Errors
|
|
from ..util import resolve_dot_names, registry, logger
|
|
|
|
if TYPE_CHECKING:
|
|
from ..language import Language # noqa: F401
|
|
|
|
|
|
DIR_MODEL_BEST = "model-best"
|
|
DIR_MODEL_LAST = "model-last"
|
|
|
|
|
|
def train(
|
|
nlp: "Language",
|
|
output_path: Optional[Path] = None,
|
|
*,
|
|
use_gpu: int = -1,
|
|
stdout: IO = sys.stdout,
|
|
stderr: IO = sys.stderr,
|
|
) -> Tuple["Language", Optional[Path]]:
|
|
"""Train a pipeline.
|
|
|
|
nlp (Language): The initialized nlp object with the full config.
|
|
output_path (Optional[Path]): Optional output path to save trained model to.
|
|
use_gpu (int): Whether to train on GPU. Make sure to call require_gpu
|
|
before calling this function.
|
|
stdout (file): A file-like object to write output messages. To disable
|
|
printing, set to io.StringIO.
|
|
stderr (file): A second file-like object to write output messages. To disable
|
|
printing, set to io.StringIO.
|
|
|
|
RETURNS (tuple): The final nlp object and the path to the exported model.
|
|
"""
|
|
# We use no_print here so we can respect the stdout/stderr options.
|
|
msg = Printer(no_print=True)
|
|
# Create iterator, which yields out info after each optimization step.
|
|
config = nlp.config.interpolate()
|
|
if config["training"]["seed"] is not None:
|
|
fix_random_seed(config["training"]["seed"])
|
|
allocator = config["training"]["gpu_allocator"]
|
|
if use_gpu >= 0 and allocator:
|
|
set_gpu_allocator(allocator)
|
|
T = registry.resolve(config["training"], schema=ConfigSchemaTraining)
|
|
dot_names = [T["train_corpus"], T["dev_corpus"]]
|
|
train_corpus, dev_corpus = resolve_dot_names(config, dot_names)
|
|
optimizer = T["optimizer"]
|
|
score_weights = T["score_weights"]
|
|
batcher = T["batcher"]
|
|
train_logger = T["logger"]
|
|
before_to_disk = create_before_to_disk_callback(T["before_to_disk"])
|
|
|
|
# Helper function to save checkpoints. This is a closure for convenience,
|
|
# to avoid passing in all the args all the time.
|
|
def save_checkpoint(is_best):
|
|
with nlp.use_params(optimizer.averages):
|
|
before_to_disk(nlp).to_disk(output_path / DIR_MODEL_LAST)
|
|
if is_best:
|
|
# Avoid saving twice (saving will be more expensive than
|
|
# the dir copy)
|
|
if (output_path / DIR_MODEL_BEST).exists():
|
|
shutil.rmtree(output_path / DIR_MODEL_BEST)
|
|
shutil.copytree(output_path / DIR_MODEL_LAST, output_path / DIR_MODEL_BEST)
|
|
|
|
# Components that shouldn't be updated during training
|
|
frozen_components = T["frozen_components"]
|
|
# Components that should set annotations on update
|
|
annotating_components = T["annotating_components"]
|
|
# Create iterator, which yields out info after each optimization step.
|
|
training_step_iterator = train_while_improving(
|
|
nlp,
|
|
optimizer,
|
|
create_train_batches(nlp, train_corpus, batcher, T["max_epochs"]),
|
|
create_evaluation_callback(nlp, dev_corpus, score_weights),
|
|
dropout=T["dropout"],
|
|
accumulate_gradient=T["accumulate_gradient"],
|
|
patience=T["patience"],
|
|
max_steps=T["max_steps"],
|
|
eval_frequency=T["eval_frequency"],
|
|
exclude=frozen_components,
|
|
annotating_components=annotating_components,
|
|
)
|
|
clean_output_dir(output_path)
|
|
stdout.write(msg.info(f"Pipeline: {nlp.pipe_names}") + "\n")
|
|
if frozen_components:
|
|
stdout.write(msg.info(f"Frozen components: {frozen_components}") + "\n")
|
|
if annotating_components:
|
|
stdout.write(
|
|
msg.info(f"Set annotations on update for: {annotating_components}") + "\n"
|
|
)
|
|
stdout.write(msg.info(f"Initial learn rate: {optimizer.learn_rate}") + "\n")
|
|
with nlp.select_pipes(disable=frozen_components):
|
|
log_step, finalize_logger = train_logger(nlp, stdout, stderr)
|
|
try:
|
|
for batch, info, is_best_checkpoint in training_step_iterator:
|
|
if is_best_checkpoint is not None:
|
|
with nlp.select_pipes(disable=frozen_components):
|
|
update_meta(T, nlp, info)
|
|
if output_path is not None:
|
|
save_checkpoint(is_best_checkpoint)
|
|
info["output_path"] = str(output_path / DIR_MODEL_LAST)
|
|
log_step(info if is_best_checkpoint is not None else None)
|
|
except Exception as e:
|
|
if output_path is not None:
|
|
stdout.write(
|
|
msg.warn(
|
|
f"Aborting and saving the final best model. "
|
|
f"Encountered exception: {repr(e)}"
|
|
)
|
|
+ "\n"
|
|
)
|
|
raise e
|
|
finally:
|
|
finalize_logger()
|
|
if output_path is not None:
|
|
save_checkpoint(False)
|
|
# This will only run if we did't hit an error
|
|
if optimizer.averages:
|
|
nlp.use_params(optimizer.averages)
|
|
if output_path is not None:
|
|
stdout.write(
|
|
msg.good("Saved pipeline to output directory", output_path / DIR_MODEL_LAST)
|
|
+ "\n"
|
|
)
|
|
return (nlp, output_path / DIR_MODEL_LAST)
|
|
else:
|
|
return (nlp, None)
|
|
|
|
|
|
def train_while_improving(
|
|
nlp: "Language",
|
|
optimizer: Optimizer,
|
|
train_data,
|
|
evaluate,
|
|
*,
|
|
dropout: float,
|
|
eval_frequency: int,
|
|
accumulate_gradient: int,
|
|
patience: int,
|
|
max_steps: int,
|
|
exclude: List[str],
|
|
annotating_components: List[str],
|
|
):
|
|
"""Train until an evaluation stops improving. Works as a generator,
|
|
with each iteration yielding a tuple `(batch, info, is_best_checkpoint)`,
|
|
where info is a dict, and is_best_checkpoint is in [True, False, None] --
|
|
None indicating that the iteration was not evaluated as a checkpoint.
|
|
The evaluation is conducted by calling the evaluate callback.
|
|
|
|
Positional arguments:
|
|
nlp: The spaCy pipeline to evaluate.
|
|
optimizer: The optimizer callable.
|
|
train_data (Iterable[Batch]): A generator of batches, with the training
|
|
data. Each batch should be a Sized[Tuple[Input, Annot]]. The training
|
|
data iterable needs to take care of iterating over the epochs and
|
|
shuffling.
|
|
evaluate (Callable[[], Tuple[float, Any]]): A callback to perform evaluation.
|
|
The callback should take no arguments and return a tuple
|
|
`(main_score, other_scores)`. The main_score should be a float where
|
|
higher is better. other_scores can be any object.
|
|
|
|
Every iteration, the function yields out a tuple with:
|
|
|
|
* batch: A list of Example objects.
|
|
* info: A dict with various information about the last update (see below).
|
|
* is_best_checkpoint: A value in None, False, True, indicating whether this
|
|
was the best evaluation so far. You should use this to save the model
|
|
checkpoints during training. If None, evaluation was not conducted on
|
|
that iteration. False means evaluation was conducted, but a previous
|
|
evaluation was better.
|
|
|
|
The info dict provides the following information:
|
|
|
|
epoch (int): How many passes over the data have been completed.
|
|
step (int): How many steps have been completed.
|
|
score (float): The main score from the last evaluation.
|
|
other_scores: : The other scores from the last evaluation.
|
|
losses: The accumulated losses throughout training.
|
|
checkpoints: A list of previous results, where each result is a
|
|
(score, step, epoch) tuple.
|
|
"""
|
|
if isinstance(dropout, float):
|
|
dropouts = constant(dropout)
|
|
else:
|
|
dropouts = dropout
|
|
results = []
|
|
losses: Dict[str, float] = {}
|
|
words_seen = 0
|
|
start_time = timer()
|
|
for step, (epoch, batch) in enumerate(train_data):
|
|
dropout = next(dropouts) # type: ignore
|
|
for subbatch in subdivide_batch(batch, accumulate_gradient):
|
|
nlp.update(
|
|
subbatch,
|
|
drop=dropout,
|
|
losses=losses,
|
|
sgd=False, # type: ignore[arg-type]
|
|
exclude=exclude,
|
|
annotates=annotating_components,
|
|
)
|
|
# TODO: refactor this so we don't have to run it separately in here
|
|
for name, proc in nlp.pipeline:
|
|
if (
|
|
name not in exclude
|
|
and hasattr(proc, "is_trainable")
|
|
and proc.is_trainable
|
|
and proc.model not in (True, False, None) # type: ignore[attr-defined]
|
|
):
|
|
proc.finish_update(optimizer) # type: ignore[attr-defined]
|
|
optimizer.step_schedules()
|
|
if not (step % eval_frequency):
|
|
if optimizer.averages:
|
|
with nlp.use_params(optimizer.averages):
|
|
score, other_scores = evaluate()
|
|
else:
|
|
score, other_scores = evaluate()
|
|
results.append((score, step))
|
|
is_best_checkpoint = score == max(results)[0]
|
|
else:
|
|
score, other_scores = (None, None)
|
|
is_best_checkpoint = None
|
|
words_seen += sum(len(eg) for eg in batch)
|
|
info = {
|
|
"epoch": epoch,
|
|
"step": step,
|
|
"score": score,
|
|
"other_scores": other_scores,
|
|
"losses": losses,
|
|
"checkpoints": results,
|
|
"seconds": int(timer() - start_time),
|
|
"words": words_seen,
|
|
}
|
|
yield batch, info, is_best_checkpoint
|
|
if is_best_checkpoint is not None:
|
|
losses = {}
|
|
# Stop if no improvement in `patience` updates (if specified)
|
|
# Negate step value so that the earliest best step is chosen for the
|
|
# same score, i.e. (1.0, 100) is chosen over (1.0, 200)
|
|
best_result = max((r_score, -r_step) for r_score, r_step in results)
|
|
best_step = -best_result[1]
|
|
if patience and (step - best_step) >= patience:
|
|
break
|
|
# Stop if we've exhausted our max steps (if specified)
|
|
if max_steps and step >= max_steps:
|
|
break
|
|
|
|
|
|
def subdivide_batch(batch, accumulate_gradient):
|
|
batch = list(batch)
|
|
batch.sort(key=lambda eg: len(eg.predicted))
|
|
sub_len = len(batch) // accumulate_gradient
|
|
start = 0
|
|
for i in range(accumulate_gradient):
|
|
subbatch = batch[start : start + sub_len]
|
|
if subbatch:
|
|
yield subbatch
|
|
start += len(subbatch)
|
|
subbatch = batch[start:]
|
|
if subbatch:
|
|
yield subbatch
|
|
|
|
|
|
def create_evaluation_callback(
|
|
nlp: "Language", dev_corpus: Callable, weights: Dict[str, float]
|
|
) -> Callable[[], Tuple[float, Dict[str, float]]]:
|
|
weights = {key: value for key, value in weights.items() if value is not None}
|
|
|
|
def evaluate() -> Tuple[float, Dict[str, float]]:
|
|
nonlocal weights
|
|
try:
|
|
scores = nlp.evaluate(dev_corpus(nlp))
|
|
except KeyError as e:
|
|
raise KeyError(Errors.E900.format(pipeline=nlp.pipe_names)) from e
|
|
# Calculate a weighted sum based on score_weights for the main score.
|
|
# We can only consider scores that are ints/floats, not dicts like
|
|
# entity scores per type etc.
|
|
scores = {key: value for key, value in scores.items() if value is not None}
|
|
weights = {key: value for key, value in weights.items() if key in scores}
|
|
for key, value in scores.items():
|
|
if key in weights and not isinstance(value, (int, float)):
|
|
raise ValueError(Errors.E915.format(name=key, score_type=type(value)))
|
|
try:
|
|
weighted_score = sum(
|
|
scores.get(s, 0.0) * weights.get(s, 0.0) for s in weights
|
|
)
|
|
except KeyError as e:
|
|
keys = list(scores.keys())
|
|
err = Errors.E983.format(dict="score_weights", key=str(e), keys=keys)
|
|
raise KeyError(err) from None
|
|
return weighted_score, scores
|
|
|
|
return evaluate
|
|
|
|
|
|
def create_train_batches(
|
|
nlp: "Language",
|
|
corpus: Callable[["Language"], Iterable[Example]],
|
|
batcher: Callable[[Iterable[Example]], Iterable[Example]],
|
|
max_epochs: int,
|
|
):
|
|
epoch = 0
|
|
if max_epochs >= 0:
|
|
examples = list(corpus(nlp)) # type: Iterable[Example]
|
|
if not examples:
|
|
# Raise error if no data
|
|
raise ValueError(Errors.E986)
|
|
while max_epochs < 1 or epoch != max_epochs:
|
|
if max_epochs >= 0:
|
|
random.shuffle(examples) # type: ignore
|
|
else:
|
|
examples = corpus(nlp)
|
|
for batch in batcher(examples):
|
|
yield epoch, batch
|
|
epoch += 1
|
|
|
|
|
|
def update_meta(
|
|
training: Union[Dict[str, Any], Config], nlp: "Language", info: Dict[str, Any]
|
|
) -> None:
|
|
nlp.meta["performance"] = {}
|
|
for metric in training["score_weights"]:
|
|
if metric is not None:
|
|
nlp.meta["performance"][metric] = info["other_scores"].get(metric, 0.0)
|
|
for pipe_name in nlp.pipe_names:
|
|
if pipe_name in info["losses"]:
|
|
nlp.meta["performance"][f"{pipe_name}_loss"] = info["losses"][pipe_name]
|
|
|
|
|
|
def create_before_to_disk_callback(
|
|
callback: Optional[Callable[["Language"], "Language"]]
|
|
) -> Callable[["Language"], "Language"]:
|
|
from ..language import Language # noqa: F811
|
|
|
|
def before_to_disk(nlp: Language) -> Language:
|
|
if not callback:
|
|
return nlp
|
|
modified_nlp = callback(nlp)
|
|
if not isinstance(modified_nlp, Language):
|
|
err = Errors.E914.format(name="before_to_disk", value=type(modified_nlp))
|
|
raise ValueError(err)
|
|
return modified_nlp
|
|
|
|
return before_to_disk
|
|
|
|
|
|
def clean_output_dir(path: Optional[Path]) -> None:
|
|
"""Remove an existing output directory. Typically used to ensure that that
|
|
a directory like model-best and its contents aren't just being overwritten
|
|
by nlp.to_disk, which could preserve existing subdirectories (e.g.
|
|
components that don't exist anymore).
|
|
"""
|
|
if path is not None and path.exists():
|
|
for subdir in [path / DIR_MODEL_BEST, path / DIR_MODEL_LAST]:
|
|
if subdir.exists():
|
|
try:
|
|
shutil.rmtree(str(subdir))
|
|
logger.debug(f"Removed existing output directory: {subdir}")
|
|
except Exception as e:
|
|
raise IOError(Errors.E901.format(path=path)) from e
|