Formal state machine

This commit is contained in:
richardpaulhudson 2022-07-19 15:17:25 +02:00
parent 83d0738ff3
commit 2c1f58e74f
2 changed files with 101 additions and 72 deletions

View File

@ -1,9 +1,8 @@
from typing import Any, List, Literal, Optional, Dict, Union, cast from typing import Any, List, Literal, Optional, Dict, Union, cast, Tuple
import sys import sys
from pathlib import Path from pathlib import Path
from time import time from time import time
import multiprocessing from multiprocessing import Manager, Queue, Process, set_start_method
from multiprocessing import Manager as MultiprocessingManager, Queue, Process
from os import kill, environ, linesep, mkdir, sep from os import kill, environ, linesep, mkdir, sep
from shutil import rmtree from shutil import rmtree
from signal import SIGTERM from signal import SIGTERM
@ -16,6 +15,7 @@ from ...util import SimpleFrozenDict, working_dir
from ...util import check_bool_env_var, ENV_VARS, split_command, join_command from ...util import check_bool_env_var, ENV_VARS, split_command, join_command
from ...errors import Errors from ...errors import Errors
set_start_method("spawn", force=True)
# How often the worker processes managing the commands in a parallel group # How often the worker processes managing the commands in a parallel group
# send keepalive messages to the main processes # send keepalive messages to the main processes
@ -25,59 +25,71 @@ PARALLEL_GROUP_STATUS_INTERVAL = 1
# before being copied to stdout when the group has completed # before being copied to stdout when the group has completed
PARALLEL_LOGGING_DIR_NAME = "parrTmp" PARALLEL_LOGGING_DIR_NAME = "parrTmp"
DISPLAY_STATUS_COLORS = { # The maximum permissible width of divider text describing a parallel command group
"pending": "yellow", MAX_WIDTH_DIVIDER = 60
"not rerun": "blue",
"starting": "green", # Whether or not to display realtime status information
"running": "green", DISPLAY_STATUS_TABLE = sys.stdout.isatty() and supports_ansi()
"succeeded": "green",
"failed": "red",
"killed": "red", class _ParallelCommandState:
"hung": "red", def __init__(
"cancelled": "red", self,
} name: str,
display_color: str,
transitions: Optional[Tuple[str]] = None,
):
self.name = name
self.display_color = display_color
if transitions is not None:
self.transitions = transitions
else:
self.transitions = None
_PARALLEL_COMMAND_STATES = (
_ParallelCommandState("pending", "yellow", ("starting", "cancelled", "not rerun")),
_ParallelCommandState("not rerun", "blue"),
_ParallelCommandState("starting", "green", ("running", "hung")),
_ParallelCommandState(
"running", "green", ("running", "succeeded", "failed", "killed", "hung")
),
_ParallelCommandState("succeeded", "green"),
_ParallelCommandState("failed", "red"),
_ParallelCommandState("killed", "red"),
_ParallelCommandState("hung", "red"),
_ParallelCommandState("cancelled", "red"),
)
class _ParallelCommandInfo: class _ParallelCommandInfo:
state_dict = {state.name: state for state in _PARALLEL_COMMAND_STATES}
def __init__(self, cmd_name: str, cmd: Dict, cmd_ind: int): def __init__(self, cmd_name: str, cmd: Dict, cmd_ind: int):
self.cmd_name = cmd_name self.cmd_name = cmd_name
self.cmd = cmd self.cmd = cmd
self.cmd_ind = cmd_ind self.cmd_ind = cmd_ind
self.len_os_cmds = len(cmd["script"]) self.len_os_cmds = len(cmd["script"])
self.status: Literal[ self.state = _ParallelCommandInfo.state_dict["pending"]
"pending",
"not rerun",
"starting",
"running",
"hung",
"succeeded",
"failed",
"killed",
"cancelled",
] = "pending"
self.pid: Optional[int] = None self.pid: Optional[int] = None
self.last_status_time: Optional[int] = None self.last_status_time: Optional[int] = None
self.os_cmd_ind: Optional[int] = None self.os_cmd_ind: Optional[int] = None
self.rc: Optional[int] = None self.rc: Optional[int] = None
self.output: Optional[str] = None self.output: Optional[str] = None
def change_state(self, new_state: str):
assert new_state in self.state.transitions, (
"Illegal transition from " + self.state.name + " to " + new_state + "."
)
self.state = _ParallelCommandInfo.state_dict[new_state]
@property @property
def disp_status(self) -> str: def disp_state(self) -> str:
status_str = str(self.status) state_str = self.state.name
status_color = DISPLAY_STATUS_COLORS[status_str] if state_str == "running" and self.os_cmd_ind is not None:
if status_str == "running" and self.os_cmd_ind is not None: state_str = f"{state_str} ({self.os_cmd_ind + 1}/{self.len_os_cmds})"
status_str = f"{status_str} ({self.os_cmd_ind + 1}/{self.len_os_cmds})" return color(state_str, self.state.display_color)
return color(status_str, status_color)
def _start_process(
process: Process, proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo]
) -> None:
cmd_info = proc_to_cmd_infos[process]
if cmd_info.status == "pending":
cmd_info.status = "starting"
cmd_info.last_status_time = int(time())
process.start()
def project_run_parallel_group( def project_run_parallel_group(
@ -100,12 +112,9 @@ def project_run_parallel_group(
""" """
config = load_project_config(project_dir, overrides=overrides) config = load_project_config(project_dir, overrides=overrides)
commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
parallel_group_status_queue = Manager().Queue()
parallel_group_status_queue = MultiprocessingManager().Queue()
max_parallel_processes = config.get("max_parallel_processes") max_parallel_processes = config.get("max_parallel_processes")
check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION) check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION)
multiprocessing.set_start_method("spawn", force=True)
DISPLAY_STATUS = sys.stdout.isatty() and supports_ansi()
cmd_infos = [ cmd_infos = [
_ParallelCommandInfo(cmd_name, commands[cmd_name], cmd_ind) _ParallelCommandInfo(cmd_name, commands[cmd_name], cmd_ind)
for cmd_ind, cmd_name in enumerate(cmd_names) for cmd_ind, cmd_name in enumerate(cmd_names)
@ -120,14 +129,14 @@ def project_run_parallel_group(
) )
and not force and not force
): ):
cmd_info.status = "not rerun" cmd_info.change_state("not rerun")
rmtree(PARALLEL_LOGGING_DIR_NAME, ignore_errors=True) rmtree(PARALLEL_LOGGING_DIR_NAME, ignore_errors=True)
mkdir(PARALLEL_LOGGING_DIR_NAME) mkdir(PARALLEL_LOGGING_DIR_NAME)
processes: List[Process] = [] processes: List[Process] = []
proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo] = {} proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo] = {}
num_processes = 0 num_processes = 0
for cmd_info in cmd_infos: for cmd_info in cmd_infos:
if cmd_info.status == "not rerun": if cmd_info.state.name == "not rerun":
continue continue
process = Process( process = Process(
target=_project_run_parallel_cmd, target=_project_run_parallel_cmd,
@ -152,16 +161,16 @@ def project_run_parallel_group(
divider_parallel_descriptor = parallel_descriptor = ( divider_parallel_descriptor = parallel_descriptor = (
"parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]" "parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]"
) )
if len(divider_parallel_descriptor) > 60: if len(divider_parallel_descriptor) > MAX_WIDTH_DIVIDER:
divider_parallel_descriptor = divider_parallel_descriptor[:57] + "..." divider_parallel_descriptor = divider_parallel_descriptor[:(MAX_WIDTH_DIVIDER-3)] + "..."
msg.divider(divider_parallel_descriptor) msg.divider(divider_parallel_descriptor)
if not DISPLAY_STATUS: if not DISPLAY_STATUS_TABLE:
print(parallel_descriptor) print(parallel_descriptor)
else: else:
first = True first = True
while any( while any(
cmd_info.status in ("pending", "starting", "running") cmd_info.state.name in ("pending", "starting", "running")
for cmd_info in cmd_infos for cmd_info in cmd_infos
): ):
try: try:
@ -169,60 +178,70 @@ def project_run_parallel_group(
timeout=PARALLEL_GROUP_STATUS_INTERVAL * 20 timeout=PARALLEL_GROUP_STATUS_INTERVAL * 20
) )
except Exception as e: except Exception as e:
for other_cmd_info in (c for c in cmd_infos if c.status == "running"): for other_cmd_info in (
other_cmd_info.status = "hung" c for c in cmd_infos if c.state.name == "running"
for other_cmd_info in (c for c in cmd_infos if c.status == "pending"): ):
other_cmd_info.status = "cancelled" other_cmd_info.change_state("hung")
for other_cmd_info in (
c for c in cmd_infos if c.state.name == "pending"
):
other_cmd_info.change_state("cancelled")
cmd_info = cmd_infos[cast(int, mess["cmd_ind"])] cmd_info = cmd_infos[cast(int, mess["cmd_ind"])]
if mess["status"] in ("started", "alive"): if mess["status"] in ("started", "alive"):
cmd_info.last_status_time = int(time()) cmd_info.last_status_time = int(time())
for other_cmd_info in ( for other_cmd_info in (
c for c in cmd_infos if c.status in ("starting", "running") c for c in cmd_infos if c.state.name in ("starting", "running")
): ):
if ( if (
other_cmd_info.last_status_time is not None other_cmd_info.last_status_time is not None
and time() - other_cmd_info.last_status_time and time() - other_cmd_info.last_status_time
> PARALLEL_GROUP_STATUS_INTERVAL * 20 > PARALLEL_GROUP_STATUS_INTERVAL * 20
): ):
other_cmd_info.status = "hung" other_cmd_info.change_state("hung")
if mess["status"] == "started": if mess["status"] == "started":
cmd_info.status = "running" cmd_info.change_state("running")
cmd_info.os_cmd_ind = cast(int, mess["os_cmd_ind"]) cmd_info.os_cmd_ind = cast(int, mess["os_cmd_ind"])
cmd_info.pid = cast(int, mess["pid"]) cmd_info.pid = cast(int, mess["pid"])
if mess["status"] == "completed": if mess["status"] == "completed":
cmd_info.rc = cast(int, mess["rc"]) cmd_info.rc = cast(int, mess["rc"])
if cmd_info.rc == 0: if cmd_info.rc == 0:
cmd_info.status = "succeeded" cmd_info.change_state("succeeded")
if not dry: if not dry:
update_lockfile(current_dir, cmd_info.cmd) update_lockfile(current_dir, cmd_info.cmd)
working_process = next(process_iterator, None) working_process = next(process_iterator, None)
if working_process is not None: if working_process is not None:
_start_process(working_process, proc_to_cmd_infos) _start_process(working_process, proc_to_cmd_infos)
elif cmd_info.rc > 0: elif cmd_info.rc > 0:
cmd_info.status = "failed" cmd_info.change_state("failed")
else: else:
cmd_info.status = "killed" cmd_info.change_state("killed")
cmd_info.output = cast(str, mess["output"]) cmd_info.output = cast(str, mess["output"])
if any(c for c in cmd_infos if c.status in ("failed", "killed", "hung")): if any(
for other_cmd_info in (c for c in cmd_infos if c.status == "running"): c for c in cmd_infos if c.state.name in ("failed", "killed", "hung")
):
for other_cmd_info in (
c for c in cmd_infos if c.state.name == "running"
):
try: try:
kill(cast(int, other_cmd_info.pid), SIGTERM) kill(cast(int, other_cmd_info.pid), SIGTERM)
except: except:
pass pass
for other_cmd_info in (c for c in cmd_infos if c.status == "pending"): for other_cmd_info in (
other_cmd_info.status = "cancelled" c for c in cmd_infos if c.state.name == "pending"
if mess["status"] != "alive" and DISPLAY_STATUS: ):
other_cmd_info.change_state("cancelled")
if mess["status"] != "alive" and DISPLAY_STATUS_TABLE:
if first: if first:
first = False first = False
else: else:
print("\033[2K\033[F" * (4 + len(cmd_infos))) print("\033[2K\033[F" * (4 + len(cmd_infos)))
data = [[c.cmd_name, c.disp_status] for c in cmd_infos] data = [[c.cmd_name, c.disp_state] for c in cmd_infos]
header = ["Command", "Status"] header = ["Command", "Status"]
msg.table(data, header=header) msg.table(data, header=header)
for cmd_info in (c for c in cmd_infos if c.status != "cancelled"): for cmd_info in (c for c in cmd_infos if c.state.name != "cancelled"):
msg.divider(cmd_info.cmd_name) msg.divider(cmd_info.cmd_name)
if cmd_info.status == "not rerun": if cmd_info.state.name == "not rerun":
msg.info(f"Skipping '{cmd_info.cmd_name}': nothing changed") msg.info(f"Skipping '{cmd_info.cmd_name}': nothing changed")
else: else:
print(cmd_info.output) print(cmd_info.output)
@ -233,7 +252,7 @@ def project_run_parallel_group(
group_rc = min(process_rcs) group_rc = min(process_rcs)
if group_rc != 0: if group_rc != 0:
sys.exit(group_rc) sys.exit(group_rc)
if any(c for c in cmd_infos if c.status == "hung"): if any(c for c in cmd_infos if c.state.name == "hung"):
sys.exit(-1) sys.exit(-1)
@ -337,3 +356,13 @@ def _project_run_parallel_cmd(
"output": logfile.read(), "output": logfile.read(),
} }
) )
def _start_process(
process: Process, proc_to_cmd_infos: Dict[Process, _ParallelCommandInfo]
) -> None:
cmd_info = proc_to_cmd_infos[process]
if cmd_info.state.name == "pending":
cmd_info.change_state("starting")
cmd_info.last_status_time = int(time())
process.start()

View File

@ -9,9 +9,9 @@ import typer
from .parallel import project_run_parallel_group from .parallel import project_run_parallel_group
from ...util import working_dir, run_command, split_command, is_cwd, join_command from ...util import working_dir, run_command, split_command, is_cwd, join_command
from ...util import SimpleFrozenList, ENV_VARS from ...util import SimpleFrozenList, ENV_VARS
from ...util import check_bool_env_var, SimpleFrozenDict, check_deps from ...util import check_bool_env_var, SimpleFrozenDict
from .._util import PROJECT_FILE, load_project_config, check_rerun, update_lockfile from .._util import PROJECT_FILE, load_project_config, check_rerun, update_lockfile
from .._util import project_cli, Arg, Opt, COMMAND, parse_config_overrides from .._util import project_cli, Arg, Opt, COMMAND, parse_config_overrides, check_deps
@project_cli.command( @project_cli.command(