Improve error handling with hung processes

This commit is contained in:
richardpaulhudson 2022-07-21 16:53:54 +02:00
parent fcf7b6b84d
commit 6b0ebcdb59

View File

@ -34,7 +34,7 @@ as 'failed/terminated' on Windows systems.
mp_context = get_context("spawn") mp_context = get_context("spawn")
# How often the worker processes managing the commands in a parallel group # How often the worker processes managing the commands in a parallel group
# send keepalive messages to the main process # send keepalive messages to the main process (seconds)
PARALLEL_GROUP_STATUS_INTERVAL = 1 PARALLEL_GROUP_STATUS_INTERVAL = 1
# The dirname where the temporary logfiles for a parallel group are written # The dirname where the temporary logfiles for a parallel group are written
@ -169,13 +169,13 @@ def project_run_parallel_group(
cmd_info.change_state("not rerun") cmd_info.change_state("not rerun")
rmtree(PARALLEL_LOGGING_DIR_NAME, ignore_errors=True) rmtree(PARALLEL_LOGGING_DIR_NAME, ignore_errors=True)
os.mkdir(PARALLEL_LOGGING_DIR_NAME) os.mkdir(PARALLEL_LOGGING_DIR_NAME)
processes: List[SpawnProcess] = [] worker_processes: List[SpawnProcess] = []
proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo] = {} proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo] = {}
num_processes = 0 num_concurr_worker_processes = 0
for cmd_info in cmd_infos: for cmd_info in cmd_infos:
if cmd_info.state.name == "not rerun": if cmd_info.state.name == "not rerun":
continue continue
process = mp_context.Process( worker_process = mp_context.Process(
target=_project_run_parallel_cmd, target=_project_run_parallel_cmd,
args=(cmd_info,), args=(cmd_info,),
kwargs={ kwargs={
@ -184,17 +184,17 @@ def project_run_parallel_group(
"parallel_group_status_queue": parallel_group_status_queue, "parallel_group_status_queue": parallel_group_status_queue,
}, },
) )
processes.append(process) worker_processes.append(worker_process)
proc_to_cmd_infos[process] = cmd_info proc_to_cmd_infos[worker_process] = cmd_info
num_processes = len(processes) num_concurr_worker_processes = len(worker_processes)
if ( if (
max_parallel_processes is not None max_parallel_processes is not None
and max_parallel_processes < num_processes and max_parallel_processes < num_concurr_worker_processes
): ):
num_processes = max_parallel_processes num_concurr_worker_processes = max_parallel_processes
process_iterator = iter(processes) worker_process_iterator = iter(worker_processes)
for _ in range(num_processes): for _ in range(num_concurr_worker_processes):
_start_process(next(process_iterator), proc_to_cmd_infos) _start_worker_process(next(worker_process_iterator), proc_to_cmd_infos)
divider_parallel_descriptor = parallel_descriptor = ( divider_parallel_descriptor = parallel_descriptor = (
"parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]" "parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]"
) )
@ -230,6 +230,7 @@ def project_run_parallel_group(
c for c in cmd_infos if c.state.name == "pending" c for c in cmd_infos if c.state.name == "pending"
): ):
other_cmd_info.change_state("cancelled") other_cmd_info.change_state("cancelled")
break
cmd_info = cmd_infos[cast(int, mess["cmd_index"])] cmd_info = cmd_infos[cast(int, mess["cmd_index"])]
if mess["type"] in ("started", "keepalive"): if mess["type"] in ("started", "keepalive"):
cmd_info.last_keepalive_time = int(time()) cmd_info.last_keepalive_time = int(time())
@ -253,9 +254,9 @@ def project_run_parallel_group(
cmd_info.change_state("succeeded") cmd_info.change_state("succeeded")
if not dry: if not dry:
update_lockfile(current_dir, cmd_info.cmd) update_lockfile(current_dir, cmd_info.cmd)
working_process = next(process_iterator, None) next_worker_process = next(worker_process_iterator, None)
if working_process is not None: if next_worker_process is not None:
_start_process(working_process, proc_to_cmd_infos) _start_worker_process(next_worker_process, proc_to_cmd_infos)
elif cmd_info.rc > 0: elif cmd_info.rc > 0:
cmd_info.change_state("failed") cmd_info.change_state("failed")
else: else:
@ -292,8 +293,11 @@ def project_run_parallel_group(
msg.divider(cmd_info.cmd_name) msg.divider(cmd_info.cmd_name)
if cmd_info.state.name == "not rerun": if cmd_info.state.name == "not rerun":
msg.info(f"Skipping '{cmd_info.cmd_name}': nothing changed") msg.info(f"Skipping '{cmd_info.cmd_name}': nothing changed")
else: elif cmd_info.console_output is not None:
print(cmd_info.console_output) print(cmd_info.console_output)
# Occasionally when something has hung there may still be worker processes to tidy up
for hung_worker_process in (wp for wp in worker_processes if wp.is_alive()):
hung_worker_process.terminate()
process_rcs = [c.rc for c in cmd_infos if c.rc is not None] process_rcs = [c.rc for c in cmd_infos if c.rc is not None]
if len(process_rcs) > 0: if len(process_rcs) > 0:
group_rc = max(process_rcs) group_rc = max(process_rcs)
@ -425,11 +429,12 @@ def _project_run_parallel_cmd(
) )
def _start_process( def _start_worker_process(
process: SpawnProcess, proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo] worker_process: SpawnProcess,
proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo],
) -> None: ) -> None:
cmd_info = proc_to_cmd_infos[process] cmd_info = proc_to_cmd_infos[worker_process]
if cmd_info.state.name == "pending": if cmd_info.state.name == "pending":
cmd_info.change_state("starting") cmd_info.change_state("starting")
cmd_info.last_keepalive_time = int(time()) cmd_info.last_keepalive_time = int(time())
process.start() worker_process.start()