Handle errors while multiprocessing (#8004)

* Handle errors while multiprocessing

Handle errors while multiprocessing without hanging.

* Return the traceback for errors raised while processing a batch, which
  can be handled by the top-level error handler
* Allow for shortened batches due to custom error handlers that ignore
  errors and skip documents

* Define custom components at a higher level

* Also move up custom error handler

* Use simpler component for test

* Switch error type

* Adjust test

* Only call top-level error handler for exceptions

* Register custom test components within tests

Use global functions (so they can be pickled) but register the
components only within the individual tests.
This commit is contained in:
Adriane Boyd 2021-05-17 13:28:39 +02:00 committed by GitHub
parent 8a2602051c
commit b120fb3511
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 148 additions and 79 deletions

View File

@ -490,6 +490,7 @@ class Errors:
E202 = ("Unsupported alignment mode '{mode}'. Supported modes: {modes}.")
# New errors added in v3.x
E871 = ("Error encountered in nlp.pipe with multiprocessing:\n\n{error}")
E872 = ("Unable to copy tokenizer from base model due to different "
'tokenizer settings: current tokenizer config "{curr_config}" '
'vs. base model "{base_config}"')

View File

@ -13,6 +13,7 @@ import srsly
import multiprocessing as mp
from itertools import chain, cycle
from timeit import default_timer as timer
import traceback
from .tokens.underscore import Underscore
from .vocab import Vocab, create_vocab
@ -1521,11 +1522,15 @@ class Language:
# Cycle channels not to break the order of docs.
# The received object is a batch of byte-encoded docs, so flatten them with chain.from_iterable.
byte_docs = chain.from_iterable(recv.recv() for recv in cycle(bytedocs_recv_ch))
docs = (Doc(self.vocab).from_bytes(byte_doc) for byte_doc in byte_docs)
byte_tuples = chain.from_iterable(recv.recv() for recv in cycle(bytedocs_recv_ch))
try:
for i, (_, doc) in enumerate(zip(raw_texts, docs), 1):
for i, (_, (byte_doc, byte_error)) in enumerate(zip(raw_texts, byte_tuples), 1):
if byte_doc is not None:
doc = Doc(self.vocab).from_bytes(byte_doc)
yield doc
elif byte_error is not None:
error = srsly.msgpack_loads(byte_error)
self.default_error_handler(None, None, None, ValueError(Errors.E871.format(error=error)))
if i % batch_size == 0:
# tell `sender` that one batch was consumed.
sender.step()
@ -2019,12 +2024,19 @@ def _apply_pipes(
"""
Underscore.load_state(underscore_state)
while True:
try:
texts = receiver.get()
docs = (make_doc(text) for text in texts)
for pipe in pipes:
docs = pipe(docs)
# Connection does not accept unpickable objects, so send list.
sender.send([doc.to_bytes() for doc in docs])
byte_docs = [(doc.to_bytes(), None) for doc in docs]
padding = [(None, None)] * (len(texts) - len(byte_docs))
sender.send(byte_docs + padding)
except Exception:
error_msg = [(None, srsly.msgpack_dumps(traceback.format_exc()))]
padding = [(None, None)] * (len(texts) - 1)
sender.send(error_msg + padding)
class _Sender:

View File

@ -8,13 +8,36 @@ from spacy.vocab import Vocab
from spacy.training import Example
from spacy.lang.en import English
from spacy.lang.de import German
from spacy.util import registry, ignore_error, raise_error
from spacy.util import registry, ignore_error, raise_error, logger
import spacy
from thinc.api import NumpyOps, get_current_ops
from .util import add_vecs_to_vocab, assert_docs_equal
def evil_component(doc):
if "2" in doc.text:
raise ValueError("no dice")
return doc
def perhaps_set_sentences(doc):
if not doc.text.startswith("4"):
doc[-1].is_sent_start = True
return doc
def assert_sents_error(doc):
if not doc.has_annotation("SENT_START"):
raise ValueError("no sents")
return doc
def warn_error(proc_name, proc, docs, e):
logger = logging.getLogger("spacy")
logger.warning(f"Trouble with component {proc_name}.")
@pytest.fixture
def nlp():
nlp = Language(Vocab())
@ -93,19 +116,16 @@ def test_evaluate_no_pipe(nlp):
nlp.evaluate([Example.from_dict(doc, annots)])
@Language.component("test_language_vector_modification_pipe")
def vector_modification_pipe(doc):
doc.vector += 1
return doc
@Language.component("test_language_userdata_pipe")
def userdata_pipe(doc):
doc.user_data["foo"] = "bar"
return doc
@Language.component("test_language_ner_pipe")
def ner_pipe(doc):
span = Span(doc, 0, 1, label="FIRST")
doc.ents += (span,)
@ -123,6 +143,9 @@ def sample_vectors():
@pytest.fixture
def nlp2(nlp, sample_vectors):
Language.component("test_language_vector_modification_pipe", func=vector_modification_pipe)
Language.component("test_language_userdata_pipe", func=userdata_pipe)
Language.component("test_language_ner_pipe", func=ner_pipe)
add_vecs_to_vocab(nlp.vocab, sample_vectors)
nlp.add_pipe("test_language_vector_modification_pipe")
nlp.add_pipe("test_language_ner_pipe")
@ -168,8 +191,11 @@ def test_language_pipe_stream(nlp2, n_process, texts):
assert_docs_equal(doc, expected_doc)
def test_language_pipe_error_handler():
@pytest.mark.parametrize("n_process", [1, 2])
def test_language_pipe_error_handler(n_process):
"""Test that the error handling of nlp.pipe works well"""
ops = get_current_ops()
if isinstance(ops, NumpyOps) or n_process < 2:
nlp = English()
nlp.add_pipe("merge_subtokens")
nlp.initialize()
@ -178,34 +204,25 @@ def test_language_pipe_error_handler():
with pytest.raises(ValueError):
nlp(texts[0])
with pytest.raises(ValueError):
list(nlp.pipe(texts))
list(nlp.pipe(texts, n_process=n_process))
nlp.set_error_handler(raise_error)
with pytest.raises(ValueError):
list(nlp.pipe(texts))
list(nlp.pipe(texts, n_process=n_process))
# set explicitely to ignoring
nlp.set_error_handler(ignore_error)
docs = list(nlp.pipe(texts))
docs = list(nlp.pipe(texts, n_process=n_process))
assert len(docs) == 0
nlp(texts[0])
def test_language_pipe_error_handler_custom(en_vocab):
@pytest.mark.parametrize("n_process", [1, 2])
def test_language_pipe_error_handler_custom(en_vocab, n_process):
"""Test the error handling of a custom component that has no pipe method"""
@Language.component("my_evil_component")
def evil_component(doc):
if "2" in doc.text:
raise ValueError("no dice")
return doc
def warn_error(proc_name, proc, docs, e):
from spacy.util import logger
logger.warning(f"Trouble with component {proc_name}.")
Language.component("my_evil_component", func=evil_component)
ops = get_current_ops()
if isinstance(ops, NumpyOps) or n_process < 2:
nlp = English()
nlp.add_pipe("my_evil_component")
nlp.initialize()
texts = ["TEXT 111", "TEXT 222", "TEXT 333", "TEXT 342", "TEXT 666"]
with pytest.raises(ValueError):
# the evil custom component throws an error
@ -214,36 +231,75 @@ def test_language_pipe_error_handler_custom(en_vocab):
nlp.set_error_handler(warn_error)
logger = logging.getLogger("spacy")
with mock.patch.object(logger, "warning") as mock_warning:
# the errors by the evil custom component raise a warning for each bad batch
docs = list(nlp.pipe(texts))
# the errors by the evil custom component raise a warning for each
# bad doc
docs = list(nlp.pipe(texts, n_process=n_process))
# HACK/TODO? the warnings in child processes don't seem to be
# detected by the mock logger
if n_process == 1:
mock_warning.assert_called()
assert mock_warning.call_count == 2
assert len(docs) + mock_warning.call_count == len(texts)
assert [doc.text for doc in docs] == ["TEXT 111", "TEXT 333", "TEXT 666"]
def test_language_pipe_error_handler_pipe(en_vocab):
@pytest.mark.parametrize("n_process", [1, 2])
def test_language_pipe_error_handler_pipe(en_vocab, n_process):
"""Test the error handling of a component's pipe method"""
@Language.component("my_sentences")
def perhaps_set_sentences(doc):
if not doc.text.startswith("4"):
doc[-1].is_sent_start = True
return doc
Language.component("my_perhaps_sentences", func=perhaps_set_sentences)
Language.component("assert_sents_error", func=assert_sents_error)
ops = get_current_ops()
if isinstance(ops, NumpyOps) or n_process < 2:
texts = [f"{str(i)} is enough. Done" for i in range(100)]
nlp = English()
nlp.add_pipe("my_sentences")
entity_linker = nlp.add_pipe("entity_linker", config={"entity_vector_length": 3})
entity_linker.kb.add_entity(entity="Q1", freq=12, entity_vector=[1, 2, 3])
nlp.add_pipe("my_perhaps_sentences")
nlp.add_pipe("assert_sents_error")
nlp.initialize()
with pytest.raises(ValueError):
# the entity linker requires sentence boundaries, will throw an error otherwise
docs = list(nlp.pipe(texts, batch_size=10))
# assert_sents_error requires sentence boundaries, will throw an error otherwise
docs = list(nlp.pipe(texts, n_process=n_process, batch_size=10))
nlp.set_error_handler(ignore_error)
docs = list(nlp.pipe(texts, batch_size=10))
# we lose/ignore the failing 0-9 and 40-49 batches
assert len(docs) == 80
docs = list(nlp.pipe(texts, n_process=n_process, batch_size=10))
# we lose/ignore the failing 4,40-49 docs
assert len(docs) == 89
@pytest.mark.parametrize("n_process", [1, 2])
def test_language_pipe_error_handler_make_doc_actual(n_process):
"""Test the error handling for make_doc"""
# TODO: fix so that the following test is the actual behavior
ops = get_current_ops()
if isinstance(ops, NumpyOps) or n_process < 2:
nlp = English()
nlp.max_length = 10
texts = ["12345678901234567890", "12345"] * 10
with pytest.raises(ValueError):
list(nlp.pipe(texts, n_process=n_process))
nlp.default_error_handler = ignore_error
if n_process == 1:
with pytest.raises(ValueError):
list(nlp.pipe(texts, n_process=n_process))
else:
docs = list(nlp.pipe(texts, n_process=n_process))
assert len(docs) == 0
@pytest.mark.xfail
@pytest.mark.parametrize("n_process", [1, 2])
def test_language_pipe_error_handler_make_doc_preferred(n_process):
"""Test the error handling for make_doc"""
ops = get_current_ops()
if isinstance(ops, NumpyOps) or n_process < 2:
nlp = English()
nlp.max_length = 10
texts = ["12345678901234567890", "12345"] * 10
with pytest.raises(ValueError):
list(nlp.pipe(texts, n_process=n_process))
nlp.default_error_handler = ignore_error
docs = list(nlp.pipe(texts, n_process=n_process))
assert len(docs) == 0
def test_language_from_config_before_after_init():