Merge remote-tracking branch 'upstream/master' into bugfix/port-5000-mac-reserved

This commit is contained in:
Daniël de Kok 2024-01-23 19:24:24 +01:00
commit d59134c796

View File

@ -1683,6 +1683,12 @@ class Language:
for proc in procs:
proc.start()
# Close writing-end of channels. This is needed to avoid that reading
# from the channel blocks indefinitely when the worker closes the
# channel.
for tx in bytedocs_send_ch:
tx.close()
# Cycle channels not to break the order of docs.
# The received object is a batch of byte-encoded docs, so flatten them with chain.from_iterable.
byte_tuples = chain.from_iterable(
@ -1705,8 +1711,23 @@ class Language:
# tell `sender` that one batch was consumed.
sender.step()
finally:
# If we are stopping in an orderly fashion, the workers' queues
# are empty. Put the sentinel in their queues to signal that work
# is done, so that they can exit gracefully.
for q in texts_q:
q.put(_WORK_DONE_SENTINEL)
# Otherwise, we are stopping because the error handler raised an
# exception. The sentinel will be last to go out of the queue.
# To avoid doing unnecessary work or hanging on platforms that
# block on sending (Windows), we'll close our end of the channel.
# This signals to the worker that it can exit the next time it
# attempts to send data down the channel.
for r in bytedocs_recv_ch:
r.close()
for proc in procs:
proc.terminate()
proc.join()
def _link_components(self) -> None:
"""Register 'listeners' within pipeline components, to allow them to
@ -2323,6 +2344,11 @@ def _apply_pipes(
while True:
try:
texts_with_ctx = receiver.get()
# Stop working if we encounter the end-of-work sentinel.
if isinstance(texts_with_ctx, _WorkDoneSentinel):
return
docs = (
ensure_doc(doc_like, context) for doc_like, context in texts_with_ctx
)
@ -2331,11 +2357,21 @@ def _apply_pipes(
# Connection does not accept unpickable objects, so send list.
byte_docs = [(doc.to_bytes(), doc._context, None) for doc in docs]
padding = [(None, None, None)] * (len(texts_with_ctx) - len(byte_docs))
sender.send(byte_docs + padding) # type: ignore[operator]
data: Sequence[Tuple[Optional[bytes], Optional[Any], Optional[bytes]]] = (
byte_docs + padding # type: ignore[operator]
)
except Exception:
error_msg = [(None, None, srsly.msgpack_dumps(traceback.format_exc()))]
padding = [(None, None, None)] * (len(texts_with_ctx) - 1)
sender.send(error_msg + padding)
data = error_msg + padding
try:
sender.send(data)
except BrokenPipeError:
# Parent has closed the pipe prematurely. This happens when a
# worker encounters an error and the error handler is set to
# stop processing.
return
class _Sender:
@ -2365,3 +2401,10 @@ class _Sender:
if self.count >= self.chunk_size:
self.count = 0
self.send()
class _WorkDoneSentinel:
pass
_WORK_DONE_SENTINEL = _WorkDoneSentinel()