spaCy/spacy/training/batchers.py
Daniël de Kok b052b1b47f
Fix batching regression (#12094)
* Fix batching regression

Some time ago, the spaCy v4 branch switched to the new Thinc v9
schedule. However, this introduced an error in how batching is handed.

In the PR, the batchers were changed to keep track of their step,
so that the step can be passed to the schedule. However, the issue
is that the training loop repeatedly calls the batching functions
(rather than using an infinite generator/iterator). So, the step and
therefore the schedule would be reset each epoch. Before the schedule
switch we didn't have this issue, because the old schedules were
stateful.

This PR fixes this issue by reverting the batching functions to use
a (stateful) generator. Their registry functions do accept a `Schedule`
and we convert `Schedule`s to generators.

* Update batcher docs

* Docstring fixes

* Make minibatch take iterables again as well

* Bump thinc requirement to 9.0.0.dev2

* Use type declaration

* Convert another comment into a proper type declaration
2023-01-18 18:28:30 +01:00

241 lines
9.2 KiB
Python

from typing import Union, Iterable, Sequence, TypeVar, List, Callable, Iterator
from typing import Optional, Any
from functools import partial
import itertools
from thinc.schedules import Schedule
from ..util import registry, minibatch
SizingSchedule = Union[Iterable[int], int, Schedule]
Sizing = Union[Iterable[int], int]
ItemT = TypeVar("ItemT")
BatcherT = Callable[[Iterable[ItemT]], Iterable[List[ItemT]]]
@registry.batchers("spacy.batch_by_padded.v1")
def configure_minibatch_by_padded_size(
*,
size: SizingSchedule,
buffer: int,
discard_oversize: bool,
get_length: Optional[Callable[[ItemT], int]] = None
) -> BatcherT:
"""Create a batcher that uses the `batch_by_padded_size` strategy.
The padded size is defined as the maximum length of sequences within the
batch multiplied by the number of sequences in the batch.
size (int, Iterable[int] or Schedule): The largest padded size to batch sequences
into. Can be a single integer, or a sequence, allowing for variable batch sizes.
buffer (int): The number of sequences to accumulate before sorting by length.
A larger buffer will result in more even sizing, but if the buffer is
very large, the iteration order will be less random, which can result
in suboptimal training.
discard_oversize (bool): Whether to discard sequences that are by themselves
longer than the largest padded batch size.
get_length (Callable or None): Function to get the length of a sequence item.
The `len` function is used by default.
"""
# Avoid displacing optional values from the underlying function.
optionals = {"get_length": get_length} if get_length is not None else {}
return partial(
minibatch_by_padded_size,
size=_schedule_to_sizing(size),
buffer=buffer,
discard_oversize=discard_oversize,
**optionals
)
@registry.batchers("spacy.batch_by_words.v1")
def configure_minibatch_by_words(
*,
size: SizingSchedule,
tolerance: float,
discard_oversize: bool,
get_length: Optional[Callable[[ItemT], int]] = None
) -> BatcherT:
"""Create a batcher that uses the "minibatch by words" strategy.
size (int, Iterable[int] or Schedule): The target number of words per batch.
Can be a single integer, or a sequence, allowing for variable batch sizes.
tolerance (float): What percentage of the size to allow batches to exceed.
discard_oversize (bool): Whether to discard sequences that by themselves
exceed the tolerated size.
get_length (Callable or None): Function to get the length of a sequence
item. The `len` function is used by default.
"""
optionals = {"get_length": get_length} if get_length is not None else {}
return partial(
minibatch_by_words,
size=_schedule_to_sizing(size),
tolerance=tolerance,
discard_oversize=discard_oversize,
**optionals
)
@registry.batchers("spacy.batch_by_sequence.v1")
def configure_minibatch(
size: SizingSchedule, get_length: Optional[Callable[[ItemT], int]] = None
) -> BatcherT:
"""Create a batcher that creates batches of the specified size.
size (int, Iterable[int] or Schedule): The target number of items per batch.
Can be a single integer, or a sequence, allowing for variable batch sizes.
"""
optionals = {"get_length": get_length} if get_length is not None else {}
return partial(minibatch, size=_schedule_to_sizing(size), **optionals)
def minibatch_by_padded_size(
seqs: Iterable[ItemT],
size: Sizing,
buffer: int = 256,
discard_oversize: bool = False,
get_length: Callable = len,
) -> Iterable[List[ItemT]]:
"""Minibatch a sequence by the size of padded batches that would result,
with sequences binned by length within a window.
The padded size is defined as the maximum length of sequences within the
batch multiplied by the number of sequences in the batch.
size (int or Iterable[int]): The largest padded size to batch sequences into.
buffer (int): The number of sequences to accumulate before sorting by length.
A larger buffer will result in more even sizing, but if the buffer is
very large, the iteration order will be less random, which can result
in suboptimal training.
discard_oversize (bool): Whether to discard sequences that are by themselves
longer than the largest padded batch size.
get_length (Callable or None): Function to get the length of a sequence item.
The `len` function is used by default.
"""
if isinstance(size, int):
size_: Iterator[int] = itertools.repeat(size)
else:
size_ = iter(size)
for outer_batch in minibatch(seqs, size=buffer):
outer_batch = list(outer_batch)
target_size = next(size_)
for indices in _batch_by_length(outer_batch, target_size, get_length):
subbatch = [outer_batch[i] for i in indices]
padded_size = max(len(seq) for seq in subbatch) * len(subbatch)
if discard_oversize and padded_size >= target_size:
pass
else:
yield subbatch
def minibatch_by_words(
seqs: Iterable[ItemT],
size: Sizing,
tolerance=0.2,
discard_oversize=False,
get_length=len,
) -> Iterable[List[ItemT]]:
"""Create minibatches of roughly a given number of words. If any examples
are longer than the specified batch length, they will appear in a batch by
themselves, or be discarded if discard_oversize=True.
seqs (Iterable[Sequence]): The sequences to minibatch.
size (int or Iterable[int]): The target number of words per batch.
Can be a single integer, or a sequence, allowing for variable batch sizes.
tolerance (float): What percentage of the size to allow batches to exceed.
discard_oversize (bool): Whether to discard sequences that by themselves
exceed the tolerated size.
get_length (Callable or None): Function to get the length of a sequence
item. The `len` function is used by default.
"""
if isinstance(size, int):
size_: Iterator[int] = itertools.repeat(size)
else:
size_ = iter(size)
target_size = next(size_)
tol_size = target_size * tolerance
batch = []
overflow = []
batch_size = 0
overflow_size = 0
for seq in seqs:
n_words = get_length(seq)
# if the current example exceeds the maximum batch size, it is returned separately
# but only if discard_oversize=False.
if n_words > target_size + tol_size:
if not discard_oversize:
yield [seq]
# add the example to the current batch if there's no overflow yet and it still fits
elif overflow_size == 0 and (batch_size + n_words) <= target_size:
batch.append(seq)
batch_size += n_words
# add the example to the overflow buffer if it fits in the tolerance margin
elif (batch_size + overflow_size + n_words) <= (target_size + tol_size):
overflow.append(seq)
overflow_size += n_words
# yield the previous batch and start a new one. The new one gets the overflow examples.
else:
if batch:
yield batch
target_size = next(size_)
tol_size = target_size * tolerance
batch = overflow
batch_size = overflow_size
overflow = []
overflow_size = 0
# this example still fits
if (batch_size + n_words) <= target_size:
batch.append(seq)
batch_size += n_words
# this example fits in overflow
elif (batch_size + n_words) <= (target_size + tol_size):
overflow.append(seq)
overflow_size += n_words
# this example does not fit with the previous overflow: start another new batch
else:
if batch:
yield batch
target_size = next(size_)
tol_size = target_size * tolerance
batch = [seq]
batch_size = n_words
batch.extend(overflow)
if batch:
yield batch
def _batch_by_length(
seqs: Sequence[Any], max_words: int, get_length=len
) -> List[List[Any]]:
"""Given a list of sequences, return a batched list of indices into the
list, where the batches are grouped by length, in descending order.
Batches may be at most max_words in size, defined as max sequence length * size.
"""
# Use negative index so we can get sort by position ascending.
lengths_indices = [(get_length(seq), i) for i, seq in enumerate(seqs)]
lengths_indices.sort()
batches = []
batch: List[int] = []
for length, i in lengths_indices:
if not batch:
batch.append(i)
elif length * (len(batch) + 1) <= max_words:
batch.append(i)
else:
batches.append(batch)
batch = [i]
if batch:
batches.append(batch)
# Check lengths match
assert sum(len(b) for b in batches) == len(seqs)
batches = [list(sorted(batch)) for batch in batches]
batches.reverse()
return batches
def _schedule_to_sizing(size: SizingSchedule) -> Sizing:
if isinstance(size, Schedule):
return size.to_generator()
return size