Final touches

This commit is contained in:
richardpaulhudson 2022-07-20 17:54:05 +02:00
parent 4614a89bcc
commit 4902dd68be
3 changed files with 40 additions and 34 deletions

View File

@ -672,6 +672,15 @@ def update_lockfile(
srsly.write_yaml(lock_path, data)
def check_deps(cmd: Dict, cmd_name: str, project_dir: Path, dry: bool):
for dep in cmd.get("deps", []):
if not (project_dir / dep).exists():
err = f"Missing dependency specified by command '{cmd_name}': {dep}"
err_help = "Maybe you forgot to run the 'project assets' command or a previous step?"
err_kwargs = {"exits": 1} if not dry else {}
msg.fail(err, err_help, **err_kwargs)
def _get_lock_entry(project_dir: Path, command: Dict[str, Any]) -> Dict[str, Any]:
"""Get a lockfile entry for a given command. An entry includes the command,
the script (command steps) and a list of dependencies and outputs with
@ -711,12 +720,3 @@ def _get_fileinfo(
md5 = get_checksum(file_path) if file_path.exists() else None
data.append({"path": path, "md5": md5})
return data
def check_deps(cmd: Dict, cmd_name: str, project_dir: Path, dry: bool):
for dep in cmd.get("deps", []):
if not (project_dir / dep).exists():
err = f"Missing dependency specified by command '{cmd_name}': {dep}"
err_help = "Maybe you forgot to run the 'project assets' command or a previous step?"
err_kwargs = {"exits": 1} if not dry else {}
msg.fail(err, err_help, **err_kwargs)

View File

@ -11,8 +11,8 @@ from wasabi import msg
from wasabi.util import color, supports_ansi
from .._util import check_rerun, check_deps, update_lockfile, load_project_config
from ...util import SimpleFrozenDict, working_dir
from ...util import check_bool_env_var, ENV_VARS, split_command, join_command
from ...util import SimpleFrozenDict, working_dir, split_command, join_command
from ...util import check_bool_env_var, ENV_VARS
from ...errors import Errors
"""
@ -20,7 +20,7 @@ Permits the execution of a parallel command group.
The main process starts a worker process for each spaCy projects command in the parallel group.
Each spaCy projects command consists of n OS level commands. The worker process starts
subprocesses for these OS level commands in order, and sends status information about their
subprocesses for these OS level commands serially, and sends status information about their
execution back to the main process.
The main process maintains a state machine for each spaCy projects command/worker process. The meaning
@ -33,7 +33,7 @@ as 'failed/terminated' on Windows systems.
set_start_method("spawn", force=True)
# How often the worker processes managing the commands in a parallel group
# send keepalive messages to the main processes
# send keepalive messages to the main process
PARALLEL_GROUP_STATUS_INTERVAL = 1
# The dirname where the temporary logfiles for a parallel group are written
@ -52,6 +52,7 @@ class _ParallelCommandState:
self,
name: str,
display_color: str,
# transitions: the names of states that can legally follow this state
transitions: Optional[List[str]] = None,
):
self.name = name
@ -95,14 +96,14 @@ class _ParallelCommandInfo:
state_dict = {state.name: state for state in _PARALLEL_COMMAND_STATES}
def __init__(self, cmd_name: str, cmd: Dict, cmd_ind: int):
def __init__(self, cmd_name: str, cmd: Dict, cmd_index: int):
self.cmd_name = cmd_name
self.cmd = cmd
self.cmd_ind = cmd_ind
self.cmd_index = cmd_index
self.state = _ParallelCommandInfo.state_dict["pending"]
self.pid: Optional[int] = None
self.last_keepalive_time: Optional[int] = None
self.os_cmd_ind: Optional[int] = None
self.running_os_cmd_index: Optional[int] = None
self.rc: Optional[int] = None
self.console_output: Optional[str] = None
@ -115,16 +116,18 @@ class _ParallelCommandInfo:
@property
def state_repr(self) -> str:
state_str = self.state.name
if state_str == "running" and self.os_cmd_ind is not None:
if state_str == "running" and self.running_os_cmd_index is not None:
number_of_os_cmds = len(self.cmd["script"])
state_str = f"{state_str} ({self.os_cmd_ind + 1}/{number_of_os_cmds})"
state_str = (
f"{state_str} ({self.running_os_cmd_index + 1}/{number_of_os_cmds})"
)
elif state_str in ("failed", "terminated") and os.name == "nt":
# not used at present as the status table isn't displayed on Windows,
# but may be relevant in the future if Windows becomes able to handle
# ANSI commands as standard
state_str = "failed/terminated"
# we know we have ANSI commands because otherwise
# the status table would not be displayed in the first place
# we know ANSI commands are available because otherwise
# the status table would not be being displayed in the first place
return color(state_str, self.state.display_color)
@ -148,12 +151,12 @@ def project_run_parallel_group(
"""
config = load_project_config(project_dir, overrides=overrides)
commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION)
parallel_group_status_queue = Manager().Queue()
max_parallel_processes = config.get("max_parallel_processes")
check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION)
cmd_infos = [
_ParallelCommandInfo(cmd_name, commands[cmd_name], cmd_ind)
for cmd_ind, cmd_name in enumerate(cmd_names)
_ParallelCommandInfo(cmd_name, commands[cmd_name], cmd_index)
for cmd_index, cmd_name in enumerate(cmd_names)
]
with working_dir(project_dir) as current_dir:
@ -202,7 +205,8 @@ def project_run_parallel_group(
divider_parallel_descriptor[: (MAX_WIDTH_DIVIDER - 3)] + "..."
)
msg.divider(divider_parallel_descriptor)
if not DISPLAY_STATUS_TABLE:
if not DISPLAY_STATUS_TABLE and len(parallel_descriptor) > MAX_WIDTH_DIVIDER:
# reprint the descriptor if it was too long and had to be cut short
print(parallel_descriptor)
msg.info(
@ -228,7 +232,7 @@ def project_run_parallel_group(
c for c in cmd_infos if c.state.name == "pending"
):
other_cmd_info.change_state("cancelled")
cmd_info = cmd_infos[cast(int, mess["cmd_ind"])]
cmd_info = cmd_infos[cast(int, mess["cmd_index"])]
if mess["type"] in ("started", "keepalive"):
cmd_info.last_keepalive_time = int(time())
for other_cmd_info in (
@ -243,7 +247,7 @@ def project_run_parallel_group(
other_cmd_info.change_state("hung")
if mess["type"] == "started":
cmd_info.change_state("running")
cmd_info.os_cmd_ind = cast(int, mess["os_cmd_ind"])
cmd_info.running_os_cmd_index = cast(int, mess["os_cmd_index"])
cmd_info.pid = cast(int, mess["pid"])
if mess["type"] == "completed":
cmd_info.rc = cast(int, mess["rc"])
@ -269,6 +273,8 @@ def project_run_parallel_group(
try:
os.kill(cast(int, other_cmd_info.pid), SIGTERM)
except:
# there could be a race condition between the process completing
# and the message arriving to notify about it
pass
for other_cmd_info in (
c for c in cmd_infos if c.state.name == "pending"
@ -320,7 +326,7 @@ def _project_run_parallel_cmd(
file_not_found = False
# buffering=0: make sure output is not lost if a subprocess is terminated
with open(log_file_name, "wb", buffering=0) as logfile:
for os_cmd_ind, os_cmd in enumerate(cmd_info.cmd["script"]):
for os_cmd_index, os_cmd in enumerate(cmd_info.cmd["script"]):
command = split_command(os_cmd)
if len(command) and command[0] in ("python", "python3"):
# -u: prevent buffering within Python
@ -359,8 +365,8 @@ def _project_run_parallel_cmd(
parallel_group_status_queue.put(
{
"type": "started",
"cmd_ind": cmd_info.cmd_ind,
"os_cmd_ind": os_cmd_ind,
"cmd_index": cmd_info.cmd_index,
"os_cmd_index": os_cmd_index,
"pid": sp.pid,
}
)
@ -373,7 +379,7 @@ def _project_run_parallel_cmd(
parallel_group_status_queue.put(
{
"type": "keepalive",
"cmd_ind": cmd_info.cmd_ind,
"cmd_index": cmd_info.cmd_index,
}
)
else:
@ -398,8 +404,8 @@ def _project_run_parallel_cmd(
parallel_group_status_queue.put(
{
"type": "started",
"cmd_ind": cmd_info.cmd_ind,
"os_cmd_ind": os_cmd_ind,
"cmd_index": cmd_info.cmd_index,
"os_cmd_index": os_cmd_index,
"pid": -1,
}
)
@ -414,7 +420,7 @@ def _project_run_parallel_cmd(
parallel_group_status_queue.put(
{
"type": "completed",
"cmd_ind": cmd_info.cmd_ind,
"cmd_index": cmd_info.cmd_index,
"rc": rc,
"console_output": logfile.read(),
}

View File

@ -165,7 +165,7 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
]
else:
steps_data = [
(f"{i + 1}.", f"{step[1]}", commands[step[1]].get("help", ""))
(f"{i + 1}. {step[1]}", commands[step[1]].get("help", ""))
for i, step in enumerate(steps)
]
msg.table(steps_data)