From 6b0ebcdb59c28a7c03299828ea5c397c480fe8be Mon Sep 17 00:00:00 2001 From: richardpaulhudson Date: Thu, 21 Jul 2022 16:53:54 +0200 Subject: [PATCH] Improve error handling with hung processes --- spacy/cli/project/parallel.py | 45 +++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/spacy/cli/project/parallel.py b/spacy/cli/project/parallel.py index 8366d6d65..986a8574d 100644 --- a/spacy/cli/project/parallel.py +++ b/spacy/cli/project/parallel.py @@ -34,7 +34,7 @@ as 'failed/terminated' on Windows systems. mp_context = get_context("spawn") # How often the worker processes managing the commands in a parallel group -# send keepalive messages to the main process +# send keepalive messages to the main process (seconds) PARALLEL_GROUP_STATUS_INTERVAL = 1 # The dirname where the temporary logfiles for a parallel group are written @@ -169,13 +169,13 @@ def project_run_parallel_group( cmd_info.change_state("not rerun") rmtree(PARALLEL_LOGGING_DIR_NAME, ignore_errors=True) os.mkdir(PARALLEL_LOGGING_DIR_NAME) - processes: List[SpawnProcess] = [] + worker_processes: List[SpawnProcess] = [] proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo] = {} - num_processes = 0 + num_concurr_worker_processes = 0 for cmd_info in cmd_infos: if cmd_info.state.name == "not rerun": continue - process = mp_context.Process( + worker_process = mp_context.Process( target=_project_run_parallel_cmd, args=(cmd_info,), kwargs={ @@ -184,17 +184,17 @@ def project_run_parallel_group( "parallel_group_status_queue": parallel_group_status_queue, }, ) - processes.append(process) - proc_to_cmd_infos[process] = cmd_info - num_processes = len(processes) + worker_processes.append(worker_process) + proc_to_cmd_infos[worker_process] = cmd_info + num_concurr_worker_processes = len(worker_processes) if ( max_parallel_processes is not None - and max_parallel_processes < num_processes + and max_parallel_processes < num_concurr_worker_processes ): - num_processes = max_parallel_processes - process_iterator = iter(processes) - for _ in range(num_processes): - _start_process(next(process_iterator), proc_to_cmd_infos) + num_concurr_worker_processes = max_parallel_processes + worker_process_iterator = iter(worker_processes) + for _ in range(num_concurr_worker_processes): + _start_worker_process(next(worker_process_iterator), proc_to_cmd_infos) divider_parallel_descriptor = parallel_descriptor = ( "parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]" ) @@ -230,6 +230,7 @@ def project_run_parallel_group( c for c in cmd_infos if c.state.name == "pending" ): other_cmd_info.change_state("cancelled") + break cmd_info = cmd_infos[cast(int, mess["cmd_index"])] if mess["type"] in ("started", "keepalive"): cmd_info.last_keepalive_time = int(time()) @@ -253,9 +254,9 @@ def project_run_parallel_group( cmd_info.change_state("succeeded") if not dry: update_lockfile(current_dir, cmd_info.cmd) - working_process = next(process_iterator, None) - if working_process is not None: - _start_process(working_process, proc_to_cmd_infos) + next_worker_process = next(worker_process_iterator, None) + if next_worker_process is not None: + _start_worker_process(next_worker_process, proc_to_cmd_infos) elif cmd_info.rc > 0: cmd_info.change_state("failed") else: @@ -292,8 +293,11 @@ def project_run_parallel_group( msg.divider(cmd_info.cmd_name) if cmd_info.state.name == "not rerun": msg.info(f"Skipping '{cmd_info.cmd_name}': nothing changed") - else: + elif cmd_info.console_output is not None: print(cmd_info.console_output) + # Occasionally when something has hung there may still be worker processes to tidy up + for hung_worker_process in (wp for wp in worker_processes if wp.is_alive()): + hung_worker_process.terminate() process_rcs = [c.rc for c in cmd_infos if c.rc is not None] if len(process_rcs) > 0: group_rc = max(process_rcs) @@ -425,11 +429,12 @@ def _project_run_parallel_cmd( ) -def _start_process( - process: SpawnProcess, proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo] +def _start_worker_process( + worker_process: SpawnProcess, + proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo], ) -> None: - cmd_info = proc_to_cmd_infos[process] + cmd_info = proc_to_cmd_infos[worker_process] if cmd_info.state.name == "pending": cmd_info.change_state("starting") cmd_info.last_keepalive_time = int(time()) - process.start() + worker_process.start()