Use multiprocessing context

This commit is contained in:
richardpaulhudson 2022-07-21 13:40:38 +02:00
parent 48803e1b57
commit fcf7b6b84d

View File

@ -3,7 +3,8 @@ import os
import sys import sys
from pathlib import Path from pathlib import Path
from time import time from time import time
from multiprocessing import Manager, Queue, Process, set_start_method from multiprocessing import Manager, Queue, Process, get_context
from multiprocessing.context import SpawnProcess
from shutil import rmtree from shutil import rmtree
from signal import SIGTERM from signal import SIGTERM
from subprocess import STDOUT, Popen, TimeoutExpired from subprocess import STDOUT, Popen, TimeoutExpired
@ -30,7 +31,7 @@ as 'failed/terminated' on Windows systems.
""" """
# Use spawn to create worker processes on all OSs for consistency # Use spawn to create worker processes on all OSs for consistency
set_start_method("spawn", force=True) mp_context = get_context("spawn")
# How often the worker processes managing the commands in a parallel group # How often the worker processes managing the commands in a parallel group
# send keepalive messages to the main process # send keepalive messages to the main process
@ -122,9 +123,6 @@ class _ParallelCommandInfo:
f"{state_str} ({self.running_os_cmd_index + 1}/{number_of_os_cmds})" f"{state_str} ({self.running_os_cmd_index + 1}/{number_of_os_cmds})"
) )
elif state_str in ("failed", "terminated") and os.name == "nt": elif state_str in ("failed", "terminated") and os.name == "nt":
# not used at present as the status table isn't displayed on Windows,
# but may be relevant in the future if Windows becomes able to handle
# ANSI commands as standard
state_str = "failed/terminated" state_str = "failed/terminated"
# we know ANSI commands are available because otherwise # we know ANSI commands are available because otherwise
# the status table would not be being displayed in the first place # the status table would not be being displayed in the first place
@ -171,13 +169,13 @@ def project_run_parallel_group(
cmd_info.change_state("not rerun") cmd_info.change_state("not rerun")
rmtree(PARALLEL_LOGGING_DIR_NAME, ignore_errors=True) rmtree(PARALLEL_LOGGING_DIR_NAME, ignore_errors=True)
os.mkdir(PARALLEL_LOGGING_DIR_NAME) os.mkdir(PARALLEL_LOGGING_DIR_NAME)
processes: List[Process] = [] processes: List[SpawnProcess] = []
proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo] = {} proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo] = {}
num_processes = 0 num_processes = 0
for cmd_info in cmd_infos: for cmd_info in cmd_infos:
if cmd_info.state.name == "not rerun": if cmd_info.state.name == "not rerun":
continue continue
process = Process( process = mp_context.Process(
target=_project_run_parallel_cmd, target=_project_run_parallel_cmd,
args=(cmd_info,), args=(cmd_info,),
kwargs={ kwargs={
@ -428,7 +426,7 @@ def _project_run_parallel_cmd(
def _start_process( def _start_process(
process: Process, proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo] process: SpawnProcess, proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo]
) -> None: ) -> None:
cmd_info = proc_to_cmd_infos[process] cmd_info = proc_to_cmd_infos[process]
if cmd_info.state.name == "pending": if cmd_info.state.name == "pending":