Improvements / corrections

This commit is contained in:
richardpaulhudson 2022-07-19 17:10:59 +02:00
parent 2c1f58e74f
commit 012578f0f6
2 changed files with 86 additions and 41 deletions

View File

@ -15,7 +15,8 @@ Commands are only re-run if their inputs have changed."""
INTRO_WORKFLOWS = f"""The following workflows are defined by the project. They
can be executed using [`spacy project run [name]`]({DOCS_URL}/api/cli#project-run)
and will run the specified commands in order. Commands grouped within square brackets
are run in parallel. Commands are only re-run if their inputs have changed."""
preceded by 'parallel' are run in parallel. Commands are only re-run if their inputs
have changed."""
INTRO_ASSETS = f"""The following assets are defined by the project. They can
be fetched by running [`spacy project assets`]({DOCS_URL}/api/cli#project-assets)
in the project directory."""
@ -77,7 +78,7 @@ def project_document(
rendered_steps.append(md.code(step))
else:
rendered_steps.append(
"["
"parallel["
+ ", ".join(md.code(p_step) for p_step in step["parallel"])
+ "]"
)

View File

@ -1,4 +1,4 @@
from typing import Any, List, Literal, Optional, Dict, Union, cast, Tuple
from typing import Any, List, Optional, Dict, Union, cast
import sys
from pathlib import Path
from time import time
@ -15,6 +15,19 @@ from ...util import SimpleFrozenDict, working_dir
from ...util import check_bool_env_var, ENV_VARS, split_command, join_command
from ...errors import Errors
"""
Permits the execution of a parallel command group.
The main process starts a worker process for each spaCy projects command in the parallel group.
Each spaCy projects command consists of n OS level commands. The worker process starts
subprocesses for these OS level commands in order, and sends status information about their
execution back to the main process.
The main process maintains a state machine for each spaCy projects command/worker process. The meaning
of the states is documented alongside the _PARALLEL_COMMAND_STATES code.
"""
# Use spawn to create worker processes on all OSs for consistency
set_start_method("spawn", force=True)
# How often the worker processes managing the commands in a parallel group
@ -30,34 +43,48 @@ MAX_WIDTH_DIVIDER = 60
# Whether or not to display realtime status information
DISPLAY_STATUS_TABLE = sys.stdout.isatty() and supports_ansi()
class _ParallelCommandState:
def __init__(
self,
name: str,
display_color: str,
transitions: Optional[Tuple[str]] = None,
transitions: Optional[List[str]] = None,
):
self.name = name
self.display_color = display_color
if transitions is not None:
self.transitions = transitions
else:
self.transitions = None
self.transitions = []
_PARALLEL_COMMAND_STATES = (
_ParallelCommandState("pending", "yellow", ("starting", "cancelled", "not rerun")),
# The command has not yet been run
_ParallelCommandState("pending", "yellow", ["starting", "cancelled", "not rerun"]),
# The command was not run because its settings, inputs and outputs had not changed
# since a previous successful run
_ParallelCommandState("not rerun", "blue"),
_ParallelCommandState("starting", "green", ("running", "hung")),
# The main process has spawned a worker process for the command but not yet received
# an acknowledgement
_ParallelCommandState("starting", "green", ["running", "hung"]),
# The command is running
_ParallelCommandState(
"running", "green", ("running", "succeeded", "failed", "killed", "hung")
"running", "green", ["running", "succeeded", "failed", "terminated", "hung"]
),
# The command succeeded (rc=0)
_ParallelCommandState("succeeded", "green"),
# The command failed (rc>0)
_ParallelCommandState("failed", "red"),
_ParallelCommandState("killed", "red"),
# The command was terminated (rc<0), usually but not necessarily by the main process because
# another command within the same parallel group failed
_ParallelCommandState("terminated", "red"),
# The main process would expect the worker process to be running, but the worker process has
# not communicated with the main process for an extended period of time
_ParallelCommandState("hung", "red"),
# The main process did not attempt to execute the command because another command in the same
# parallel group had already failed
_ParallelCommandState("cancelled", "red"),
)
@ -70,10 +97,9 @@ class _ParallelCommandInfo:
self.cmd_name = cmd_name
self.cmd = cmd
self.cmd_ind = cmd_ind
self.len_os_cmds = len(cmd["script"])
self.state = _ParallelCommandInfo.state_dict["pending"]
self.pid: Optional[int] = None
self.last_status_time: Optional[int] = None
self.last_keepalive_time: Optional[int] = None
self.os_cmd_ind: Optional[int] = None
self.rc: Optional[int] = None
self.output: Optional[str] = None
@ -85,10 +111,11 @@ class _ParallelCommandInfo:
self.state = _ParallelCommandInfo.state_dict[new_state]
@property
def disp_state(self) -> str:
def state_repr(self) -> str:
state_str = self.state.name
if state_str == "running" and self.os_cmd_ind is not None:
state_str = f"{state_str} ({self.os_cmd_ind + 1}/{self.len_os_cmds})"
number_of_os_cmds = len(self.cmd["script"])
state_str = f"{state_str} ({self.os_cmd_ind + 1}/{number_of_os_cmds})"
return color(state_str, self.state.display_color)
@ -102,7 +129,7 @@ def project_run_parallel_group(
) -> None:
"""Run a parallel group of commands. Note that because of the challenges of managing
parallel output streams it is not possible to specify a value for 'capture'. Essentially,
with parallel groups 'capture==True' for stdout and 'capture==False' for stderr.
with parallel groups 'capture==True'.
project_dir (Path): Path to project directory.
cmd_infos: a list of objects containing information about he commands to run.
@ -162,13 +189,18 @@ def project_run_parallel_group(
"parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]"
)
if len(divider_parallel_descriptor) > MAX_WIDTH_DIVIDER:
divider_parallel_descriptor = divider_parallel_descriptor[:(MAX_WIDTH_DIVIDER-3)] + "..."
divider_parallel_descriptor = (
divider_parallel_descriptor[: (MAX_WIDTH_DIVIDER - 3)] + "..."
)
msg.divider(divider_parallel_descriptor)
if not DISPLAY_STATUS_TABLE:
print(parallel_descriptor)
else:
first = True
msg.info(
"Temporary logs are being written to "
+ sep.join((str(current_dir), PARALLEL_LOGGING_DIR_NAME))
)
first = True
while any(
cmd_info.state.name in ("pending", "starting", "running")
for cmd_info in cmd_infos
@ -177,9 +209,10 @@ def project_run_parallel_group(
mess: Dict[str, Union[str, int]] = parallel_group_status_queue.get(
timeout=PARALLEL_GROUP_STATUS_INTERVAL * 20
)
except Exception as e:
except Exception:
# No more messages are being received: the whole group has hung
for other_cmd_info in (
c for c in cmd_infos if c.state.name == "running"
c for c in cmd_infos if c.state.name in ("starting", "running")
):
other_cmd_info.change_state("hung")
for other_cmd_info in (
@ -187,22 +220,23 @@ def project_run_parallel_group(
):
other_cmd_info.change_state("cancelled")
cmd_info = cmd_infos[cast(int, mess["cmd_ind"])]
if mess["status"] in ("started", "alive"):
cmd_info.last_status_time = int(time())
if mess["type"] in ("started", "keepalive"):
cmd_info.last_keepalive_time = int(time())
for other_cmd_info in (
c for c in cmd_infos if c.state.name in ("starting", "running")
):
if (
other_cmd_info.last_status_time is not None
and time() - other_cmd_info.last_status_time
other_cmd_info.last_keepalive_time is not None
and time() - other_cmd_info.last_keepalive_time
> PARALLEL_GROUP_STATUS_INTERVAL * 20
):
# a specific command has hung
other_cmd_info.change_state("hung")
if mess["status"] == "started":
if mess["type"] == "started":
cmd_info.change_state("running")
cmd_info.os_cmd_ind = cast(int, mess["os_cmd_ind"])
cmd_info.pid = cast(int, mess["pid"])
if mess["status"] == "completed":
if mess["type"] == "completed":
cmd_info.rc = cast(int, mess["rc"])
if cmd_info.rc == 0:
cmd_info.change_state("succeeded")
@ -214,11 +248,12 @@ def project_run_parallel_group(
elif cmd_info.rc > 0:
cmd_info.change_state("failed")
else:
cmd_info.change_state("killed")
cmd_info.change_state("terminated")
cmd_info.output = cast(str, mess["output"])
if any(
c for c in cmd_infos if c.state.name in ("failed", "killed", "hung")
c for c in cmd_infos if c.state.name in ("failed", "terminated", "hung")
):
# a command in the group hasn't succeeded, so terminate/cancel the rest
for other_cmd_info in (
c for c in cmd_infos if c.state.name == "running"
):
@ -230,12 +265,13 @@ def project_run_parallel_group(
c for c in cmd_infos if c.state.name == "pending"
):
other_cmd_info.change_state("cancelled")
if mess["status"] != "alive" and DISPLAY_STATUS_TABLE:
if mess["type"] != "keepalive" and DISPLAY_STATUS_TABLE:
if first:
first = False
else:
# overwrite the existing status table
print("\033[2K\033[F" * (4 + len(cmd_infos)))
data = [[c.cmd_name, c.disp_state] for c in cmd_infos]
data = [[c.cmd_name, c.state_repr] for c in cmd_infos]
header = ["Command", "Status"]
msg.table(data, header=header)
@ -248,11 +284,9 @@ def project_run_parallel_group(
process_rcs = [c.rc for c in cmd_infos if c.rc is not None]
if len(process_rcs) > 0:
group_rc = max(process_rcs)
if group_rc <= 0:
group_rc = min(process_rcs)
if group_rc != 0:
if group_rc > 0:
sys.exit(group_rc)
if any(c for c in cmd_infos if c.state.name == "hung"):
if any(c for c in cmd_infos if c.state.name in ("hung", "terminated")):
sys.exit(-1)
@ -263,21 +297,31 @@ def _project_run_parallel_cmd(
current_dir: str,
parallel_group_status_queue: Queue,
) -> None:
"""Run a single spaCy projects command as a worker process.
Communicates with the main process via queue messages that consist of
dictionaries whose type is determined by the entry 'type'. Possible
values of 'type' are 'started', 'completed' and 'keepalive'. Each dictionary
type contains different additional fields."""
# we can use the command name as a unique log filename because a parallel
# group is not allowed to contain the same command more than once
log_file_name = sep.join(
(current_dir, PARALLEL_LOGGING_DIR_NAME, cmd_info.cmd_name + ".log")
)
file_not_found = False
# buffering=0: make sure output is not lost if a subprocess is terminated
with open(log_file_name, "wb", buffering=0) as logfile:
for os_cmd_ind, os_cmd in enumerate(cmd_info.cmd["script"]):
command = split_command(os_cmd)
if len(command) and command[0] in ("python", "python3"):
# -u: prevent buffering within Python
command = [sys.executable, "-u", *command[1:]]
elif len(command) and command[0] in ("pip", "pip3"):
command = [sys.executable, "-m", "pip", *command[1:]]
logfile.write(
bytes(
f"Running command: {join_command(command)}" + linesep,
encoding="UTF-8",
encoding="utf8",
)
)
if not dry:
@ -305,22 +349,22 @@ def _project_run_parallel_cmd(
break
parallel_group_status_queue.put(
{
"type": "started",
"cmd_ind": cmd_info.cmd_ind,
"os_cmd_ind": os_cmd_ind,
"status": "started",
"pid": sp.pid,
}
)
while True:
try:
sp.communicate(timeout=PARALLEL_GROUP_STATUS_INTERVAL)
sp.wait(timeout=PARALLEL_GROUP_STATUS_INTERVAL)
except TimeoutExpired:
pass
if sp.returncode == None:
parallel_group_status_queue.put(
{
"type": "keepalive",
"cmd_ind": cmd_info.cmd_ind,
"status": "alive",
}
)
else:
@ -336,7 +380,7 @@ def _project_run_parallel_cmd(
else:
logfile.write(
bytes(
linesep + f"Killed (rc={sp.returncode})" + linesep,
linesep + f"Terminated (rc={sp.returncode})" + linesep,
encoding="UTF-8",
)
)
@ -350,8 +394,8 @@ def _project_run_parallel_cmd(
rc = sp.returncode
parallel_group_status_queue.put(
{
"type": "completed",
"cmd_ind": cmd_info.cmd_ind,
"status": "completed",
"rc": rc,
"output": logfile.read(),
}
@ -364,5 +408,5 @@ def _start_process(
cmd_info = proc_to_cmd_infos[process]
if cmd_info.state.name == "pending":
cmd_info.change_state("starting")
cmd_info.last_status_time = int(time())
cmd_info.last_keepalive_time = int(time())
process.start()