More changes based on review comments

This commit is contained in:
richardpaulhudson 2022-07-27 14:29:48 +02:00
parent 3d16625015
commit 5393df4bf2

View File

@ -32,17 +32,17 @@ from ...util import check_bool_env_var, ENV_VARS
from ...errors import Errors
# Use spawn to create worker processes on all OSs for consistency
mp_context = get_context("spawn")
_mp_context = get_context("spawn")
# How often the worker processes managing the commands in a parallel group
# send keepalive messages to the main process (seconds)
PARALLEL_GROUP_STATUS_INTERVAL = 1
_PARALLEL_GROUP_STATUS_INTERVAL = 1
# The maximum permissible width of divider text describing a parallel command group
MAX_WIDTH_DIVIDER = 60
_MAX_WIDTH_DIVIDER = 60
# Whether or not to display realtime status information
DISPLAY_STATUS_TABLE = sys.stdout.isatty() and supports_ansi()
_DISPLAY_STATUS_TABLE = sys.stdout.isatty() and supports_ansi()
@dataclass
@ -153,12 +153,12 @@ def project_run_parallel_group(
divider_parallel_descriptor = parallel_descriptor = (
"parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]"
)
if len(divider_parallel_descriptor) > MAX_WIDTH_DIVIDER:
if len(divider_parallel_descriptor) > _MAX_WIDTH_DIVIDER:
divider_parallel_descriptor = (
divider_parallel_descriptor[: (MAX_WIDTH_DIVIDER - 3)] + "..."
divider_parallel_descriptor[: (_MAX_WIDTH_DIVIDER - 3)] + "..."
)
msg.divider(divider_parallel_descriptor)
if not DISPLAY_STATUS_TABLE and len(parallel_descriptor) > MAX_WIDTH_DIVIDER:
if not _DISPLAY_STATUS_TABLE and len(parallel_descriptor) > _MAX_WIDTH_DIVIDER:
# reprint the descriptor if it was too long and had to be cut short
print(parallel_descriptor)
msg.info("Temporary logs are being written to " + temp_log_dir)
@ -176,11 +176,10 @@ def project_run_parallel_group(
cmd_info.change_state("not rerun")
worker_processes: List[SpawnProcess] = []
proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo] = {}
num_concurr_worker_processes = 0
for cmd_info in cmd_infos:
if cmd_info.state.name == "not rerun":
continue
worker_process = mp_context.Process(
worker_process = _mp_context.Process(
target=_project_run_parallel_cmd,
args=(cmd_info,),
kwargs={
@ -238,14 +237,14 @@ def _process_worker_status_messages(
current_dir: Path,
dry: bool,
) -> None:
"""Listens on the status queue and processes messages received from the worker processes.
"""Listen on the status queue and process messages received from the worker processes.
cmd_infos: a list of info objects about the commands in the parallel group
proc_to_cmd_infos: a dictionary from Process objects to command info objects
parallel_group_status_queue: the status queue
cmd_infos: a list of info objects about the commands in the parallel group.
proc_to_cmd_infos: a dictionary from Process objects to command info objects.
parallel_group_status_queue: the status queue.
worker_process_iterator: an iterator over the processes, some or all of which
will already have been iterated over and started
current_dir: the current directory
will already have been iterated over and started.
current_dir: the current directory.
dry (bool): Perform a dry run and don't execute commands.
"""
status_table_not_yet_displayed = True
@ -255,9 +254,9 @@ def _process_worker_status_messages(
):
try:
mess: Dict[str, Union[str, int]] = parallel_group_status_queue.get(
timeout=PARALLEL_GROUP_STATUS_INTERVAL * 20
timeout=_PARALLEL_GROUP_STATUS_INTERVAL * 20
)
except Exception:
except queue.Empty:
# No more messages are being received: the whole group has hung
for other_cmd_info in (
c for c in cmd_infos if c.state.name in ("starting", "running")
@ -275,7 +274,7 @@ def _process_worker_status_messages(
if (
other_cmd_info.last_keepalive_time is not None
and time() - other_cmd_info.last_keepalive_time
> PARALLEL_GROUP_STATUS_INTERVAL * 20
> _PARALLEL_GROUP_STATUS_INTERVAL * 20
):
# a specific command has hung
other_cmd_info.change_state("hung")
@ -304,14 +303,14 @@ def _process_worker_status_messages(
for other_cmd_info in (c for c in cmd_infos if c.state.name == "running"):
try:
os.kill(cast(int, other_cmd_info.pid), SIGTERM)
except:
except ProcessLookupError:
# the subprocess the main process is trying to kill could already
# have completed, and the message from the worker process notifying
# the main process about this could still be in the queue
pass
for other_cmd_info in (c for c in cmd_infos if c.state.name == "pending"):
other_cmd_info.change_state("cancelled")
if mess["mess_type"] != "keepalive" and DISPLAY_STATUS_TABLE:
if mess["mess_type"] != "keepalive" and _DISPLAY_STATUS_TABLE:
if status_table_not_yet_displayed:
status_table_not_yet_displayed = False
else:
@ -334,7 +333,14 @@ def _project_run_parallel_cmd(
Communicates with the main process via queue messages whose type is determined
by the entry 'mess_type' and that are structured as dictionaries. Possible
values of 'mess_type' are 'started', 'completed' and 'keepalive'. Each dictionary
type contains different additional fields."""
type contains different additional fields.
cmd_info: the info object for the command being run.
dry (bool): Perform a dry run and don't execute commands.
temp_log_dir: the temporary directory to which to log.
parallel_group_status_queue: the queue via which to send status messages to the
main process.
"""
# we can use the command name as a unique log filename because a parallel
# group is not allowed to contain the same command more than once
log_file_name = os.sep.join((temp_log_dir, cmd_info.cmd_name + ".log"))
@ -389,7 +395,7 @@ def _project_run_parallel_cmd(
)
while True:
try:
sp.wait(timeout=PARALLEL_GROUP_STATUS_INTERVAL)
sp.wait(timeout=_PARALLEL_GROUP_STATUS_INTERVAL)
except TimeoutExpired:
pass
if sp.returncode == None:
@ -448,6 +454,12 @@ def _start_worker_process(
worker_process: SpawnProcess,
proc_to_cmd_infos: Dict[SpawnProcess, _ParallelCommandInfo],
) -> None:
""" Start a worker process, changing the state of its info object.
worker_process: the worker process to start.
proc_to_cmd_infos: a dictionary from worker processes to
the info objects that describe them.
"""
cmd_info = proc_to_cmd_infos[worker_process]
if cmd_info.state.name == "pending":
cmd_info.change_state("starting")