From 2c1f58e74fd7ff965beef120c10805609b3acaa8 Mon Sep 17 00:00:00 2001 From: richardpaulhudson Date: Tue, 19 Jul 2022 15:17:25 +0200 Subject: [PATCH] Formal state machine --- spacy/cli/project/parallel.py | 169 ++++++++++++++++++++-------------- spacy/cli/project/run.py | 4 +- 2 files changed, 101 insertions(+), 72 deletions(-) diff --git a/spacy/cli/project/parallel.py b/spacy/cli/project/parallel.py index 49de62168..ab206d9e5 100644 --- a/spacy/cli/project/parallel.py +++ b/spacy/cli/project/parallel.py @@ -1,9 +1,8 @@ -from typing import Any, List, Literal, Optional, Dict, Union, cast +from typing import Any, List, Literal, Optional, Dict, Union, cast, Tuple import sys from pathlib import Path from time import time -import multiprocessing -from multiprocessing import Manager as MultiprocessingManager, Queue, Process +from multiprocessing import Manager, Queue, Process, set_start_method from os import kill, environ, linesep, mkdir, sep from shutil import rmtree from signal import SIGTERM @@ -16,6 +15,7 @@ from ...util import SimpleFrozenDict, working_dir from ...util import check_bool_env_var, ENV_VARS, split_command, join_command from ...errors import Errors +set_start_method("spawn", force=True) # How often the worker processes managing the commands in a parallel group # send keepalive messages to the main processes @@ -25,59 +25,71 @@ PARALLEL_GROUP_STATUS_INTERVAL = 1 # before being copied to stdout when the group has completed PARALLEL_LOGGING_DIR_NAME = "parrTmp" -DISPLAY_STATUS_COLORS = { - "pending": "yellow", - "not rerun": "blue", - "starting": "green", - "running": "green", - "succeeded": "green", - "failed": "red", - "killed": "red", - "hung": "red", - "cancelled": "red", -} +# The maximum permissible width of divider text describing a parallel command group +MAX_WIDTH_DIVIDER = 60 + +# Whether or not to display realtime status information +DISPLAY_STATUS_TABLE = sys.stdout.isatty() and supports_ansi() + + +class _ParallelCommandState: + def __init__( + self, + name: str, + display_color: str, + transitions: Optional[Tuple[str]] = None, + ): + self.name = name + self.display_color = display_color + if transitions is not None: + self.transitions = transitions + else: + self.transitions = None + + +_PARALLEL_COMMAND_STATES = ( + _ParallelCommandState("pending", "yellow", ("starting", "cancelled", "not rerun")), + _ParallelCommandState("not rerun", "blue"), + _ParallelCommandState("starting", "green", ("running", "hung")), + _ParallelCommandState( + "running", "green", ("running", "succeeded", "failed", "killed", "hung") + ), + _ParallelCommandState("succeeded", "green"), + _ParallelCommandState("failed", "red"), + _ParallelCommandState("killed", "red"), + _ParallelCommandState("hung", "red"), + _ParallelCommandState("cancelled", "red"), +) class _ParallelCommandInfo: + + state_dict = {state.name: state for state in _PARALLEL_COMMAND_STATES} + def __init__(self, cmd_name: str, cmd: Dict, cmd_ind: int): self.cmd_name = cmd_name self.cmd = cmd self.cmd_ind = cmd_ind self.len_os_cmds = len(cmd["script"]) - self.status: Literal[ - "pending", - "not rerun", - "starting", - "running", - "hung", - "succeeded", - "failed", - "killed", - "cancelled", - ] = "pending" + self.state = _ParallelCommandInfo.state_dict["pending"] self.pid: Optional[int] = None self.last_status_time: Optional[int] = None self.os_cmd_ind: Optional[int] = None self.rc: Optional[int] = None self.output: Optional[str] = None + def change_state(self, new_state: str): + assert new_state in self.state.transitions, ( + "Illegal transition from " + self.state.name + " to " + new_state + "." + ) + self.state = _ParallelCommandInfo.state_dict[new_state] + @property - def disp_status(self) -> str: - status_str = str(self.status) - status_color = DISPLAY_STATUS_COLORS[status_str] - if status_str == "running" and self.os_cmd_ind is not None: - status_str = f"{status_str} ({self.os_cmd_ind + 1}/{self.len_os_cmds})" - return color(status_str, status_color) - - -def _start_process( - process: Process, proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo] -) -> None: - cmd_info = proc_to_cmd_infos[process] - if cmd_info.status == "pending": - cmd_info.status = "starting" - cmd_info.last_status_time = int(time()) - process.start() + def disp_state(self) -> str: + state_str = self.state.name + if state_str == "running" and self.os_cmd_ind is not None: + state_str = f"{state_str} ({self.os_cmd_ind + 1}/{self.len_os_cmds})" + return color(state_str, self.state.display_color) def project_run_parallel_group( @@ -100,12 +112,9 @@ def project_run_parallel_group( """ config = load_project_config(project_dir, overrides=overrides) commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} - - parallel_group_status_queue = MultiprocessingManager().Queue() + parallel_group_status_queue = Manager().Queue() max_parallel_processes = config.get("max_parallel_processes") check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION) - multiprocessing.set_start_method("spawn", force=True) - DISPLAY_STATUS = sys.stdout.isatty() and supports_ansi() cmd_infos = [ _ParallelCommandInfo(cmd_name, commands[cmd_name], cmd_ind) for cmd_ind, cmd_name in enumerate(cmd_names) @@ -120,14 +129,14 @@ def project_run_parallel_group( ) and not force ): - cmd_info.status = "not rerun" + cmd_info.change_state("not rerun") rmtree(PARALLEL_LOGGING_DIR_NAME, ignore_errors=True) mkdir(PARALLEL_LOGGING_DIR_NAME) processes: List[Process] = [] proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo] = {} num_processes = 0 for cmd_info in cmd_infos: - if cmd_info.status == "not rerun": + if cmd_info.state.name == "not rerun": continue process = Process( target=_project_run_parallel_cmd, @@ -152,16 +161,16 @@ def project_run_parallel_group( divider_parallel_descriptor = parallel_descriptor = ( "parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]" ) - if len(divider_parallel_descriptor) > 60: - divider_parallel_descriptor = divider_parallel_descriptor[:57] + "..." + if len(divider_parallel_descriptor) > MAX_WIDTH_DIVIDER: + divider_parallel_descriptor = divider_parallel_descriptor[:(MAX_WIDTH_DIVIDER-3)] + "..." msg.divider(divider_parallel_descriptor) - if not DISPLAY_STATUS: + if not DISPLAY_STATUS_TABLE: print(parallel_descriptor) else: first = True while any( - cmd_info.status in ("pending", "starting", "running") + cmd_info.state.name in ("pending", "starting", "running") for cmd_info in cmd_infos ): try: @@ -169,60 +178,70 @@ def project_run_parallel_group( timeout=PARALLEL_GROUP_STATUS_INTERVAL * 20 ) except Exception as e: - for other_cmd_info in (c for c in cmd_infos if c.status == "running"): - other_cmd_info.status = "hung" - for other_cmd_info in (c for c in cmd_infos if c.status == "pending"): - other_cmd_info.status = "cancelled" + for other_cmd_info in ( + c for c in cmd_infos if c.state.name == "running" + ): + other_cmd_info.change_state("hung") + for other_cmd_info in ( + c for c in cmd_infos if c.state.name == "pending" + ): + other_cmd_info.change_state("cancelled") cmd_info = cmd_infos[cast(int, mess["cmd_ind"])] if mess["status"] in ("started", "alive"): cmd_info.last_status_time = int(time()) for other_cmd_info in ( - c for c in cmd_infos if c.status in ("starting", "running") + c for c in cmd_infos if c.state.name in ("starting", "running") ): if ( other_cmd_info.last_status_time is not None and time() - other_cmd_info.last_status_time > PARALLEL_GROUP_STATUS_INTERVAL * 20 ): - other_cmd_info.status = "hung" + other_cmd_info.change_state("hung") if mess["status"] == "started": - cmd_info.status = "running" + cmd_info.change_state("running") cmd_info.os_cmd_ind = cast(int, mess["os_cmd_ind"]) cmd_info.pid = cast(int, mess["pid"]) if mess["status"] == "completed": cmd_info.rc = cast(int, mess["rc"]) if cmd_info.rc == 0: - cmd_info.status = "succeeded" + cmd_info.change_state("succeeded") if not dry: update_lockfile(current_dir, cmd_info.cmd) working_process = next(process_iterator, None) if working_process is not None: _start_process(working_process, proc_to_cmd_infos) elif cmd_info.rc > 0: - cmd_info.status = "failed" + cmd_info.change_state("failed") else: - cmd_info.status = "killed" + cmd_info.change_state("killed") cmd_info.output = cast(str, mess["output"]) - if any(c for c in cmd_infos if c.status in ("failed", "killed", "hung")): - for other_cmd_info in (c for c in cmd_infos if c.status == "running"): + if any( + c for c in cmd_infos if c.state.name in ("failed", "killed", "hung") + ): + for other_cmd_info in ( + c for c in cmd_infos if c.state.name == "running" + ): try: kill(cast(int, other_cmd_info.pid), SIGTERM) except: pass - for other_cmd_info in (c for c in cmd_infos if c.status == "pending"): - other_cmd_info.status = "cancelled" - if mess["status"] != "alive" and DISPLAY_STATUS: + for other_cmd_info in ( + c for c in cmd_infos if c.state.name == "pending" + ): + other_cmd_info.change_state("cancelled") + if mess["status"] != "alive" and DISPLAY_STATUS_TABLE: if first: first = False else: print("\033[2K\033[F" * (4 + len(cmd_infos))) - data = [[c.cmd_name, c.disp_status] for c in cmd_infos] + data = [[c.cmd_name, c.disp_state] for c in cmd_infos] header = ["Command", "Status"] msg.table(data, header=header) - for cmd_info in (c for c in cmd_infos if c.status != "cancelled"): + for cmd_info in (c for c in cmd_infos if c.state.name != "cancelled"): msg.divider(cmd_info.cmd_name) - if cmd_info.status == "not rerun": + if cmd_info.state.name == "not rerun": msg.info(f"Skipping '{cmd_info.cmd_name}': nothing changed") else: print(cmd_info.output) @@ -233,7 +252,7 @@ def project_run_parallel_group( group_rc = min(process_rcs) if group_rc != 0: sys.exit(group_rc) - if any(c for c in cmd_infos if c.status == "hung"): + if any(c for c in cmd_infos if c.state.name == "hung"): sys.exit(-1) @@ -337,3 +356,13 @@ def _project_run_parallel_cmd( "output": logfile.read(), } ) + + +def _start_process( + process: Process, proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo] +) -> None: + cmd_info = proc_to_cmd_infos[process] + if cmd_info.state.name == "pending": + cmd_info.change_state("starting") + cmd_info.last_status_time = int(time()) + process.start() diff --git a/spacy/cli/project/run.py b/spacy/cli/project/run.py index 8129a782b..f54d38757 100644 --- a/spacy/cli/project/run.py +++ b/spacy/cli/project/run.py @@ -9,9 +9,9 @@ import typer from .parallel import project_run_parallel_group from ...util import working_dir, run_command, split_command, is_cwd, join_command from ...util import SimpleFrozenList, ENV_VARS -from ...util import check_bool_env_var, SimpleFrozenDict, check_deps +from ...util import check_bool_env_var, SimpleFrozenDict from .._util import PROJECT_FILE, load_project_config, check_rerun, update_lockfile -from .._util import project_cli, Arg, Opt, COMMAND, parse_config_overrides +from .._util import project_cli, Arg, Opt, COMMAND, parse_config_overrides, check_deps @project_cli.command(