diff --git a/spacy/cli/_util.py b/spacy/cli/_util.py index dd1a735d9..0d15ba82b 100644 --- a/spacy/cli/_util.py +++ b/spacy/cli/_util.py @@ -672,6 +672,15 @@ def update_lockfile( srsly.write_yaml(lock_path, data) +def check_deps(cmd: Dict, cmd_name: str, project_dir: Path, dry: bool): + for dep in cmd.get("deps", []): + if not (project_dir / dep).exists(): + err = f"Missing dependency specified by command '{cmd_name}': {dep}" + err_help = "Maybe you forgot to run the 'project assets' command or a previous step?" + err_kwargs = {"exits": 1} if not dry else {} + msg.fail(err, err_help, **err_kwargs) + + def _get_lock_entry(project_dir: Path, command: Dict[str, Any]) -> Dict[str, Any]: """Get a lockfile entry for a given command. An entry includes the command, the script (command steps) and a list of dependencies and outputs with @@ -711,12 +720,3 @@ def _get_fileinfo( md5 = get_checksum(file_path) if file_path.exists() else None data.append({"path": path, "md5": md5}) return data - - -def check_deps(cmd: Dict, cmd_name: str, project_dir: Path, dry: bool): - for dep in cmd.get("deps", []): - if not (project_dir / dep).exists(): - err = f"Missing dependency specified by command '{cmd_name}': {dep}" - err_help = "Maybe you forgot to run the 'project assets' command or a previous step?" - err_kwargs = {"exits": 1} if not dry else {} - msg.fail(err, err_help, **err_kwargs) diff --git a/spacy/cli/project/parallel.py b/spacy/cli/project/parallel.py index e0d52251d..f7cdfbc52 100644 --- a/spacy/cli/project/parallel.py +++ b/spacy/cli/project/parallel.py @@ -11,8 +11,8 @@ from wasabi import msg from wasabi.util import color, supports_ansi from .._util import check_rerun, check_deps, update_lockfile, load_project_config -from ...util import SimpleFrozenDict, working_dir -from ...util import check_bool_env_var, ENV_VARS, split_command, join_command +from ...util import SimpleFrozenDict, working_dir, split_command, join_command +from ...util import check_bool_env_var, ENV_VARS from ...errors import Errors """ @@ -20,7 +20,7 @@ Permits the execution of a parallel command group. The main process starts a worker process for each spaCy projects command in the parallel group. Each spaCy projects command consists of n OS level commands. The worker process starts -subprocesses for these OS level commands in order, and sends status information about their +subprocesses for these OS level commands serially, and sends status information about their execution back to the main process. The main process maintains a state machine for each spaCy projects command/worker process. The meaning @@ -33,7 +33,7 @@ as 'failed/terminated' on Windows systems. set_start_method("spawn", force=True) # How often the worker processes managing the commands in a parallel group -# send keepalive messages to the main processes +# send keepalive messages to the main process PARALLEL_GROUP_STATUS_INTERVAL = 1 # The dirname where the temporary logfiles for a parallel group are written @@ -52,6 +52,7 @@ class _ParallelCommandState: self, name: str, display_color: str, + # transitions: the names of states that can legally follow this state transitions: Optional[List[str]] = None, ): self.name = name @@ -95,14 +96,14 @@ class _ParallelCommandInfo: state_dict = {state.name: state for state in _PARALLEL_COMMAND_STATES} - def __init__(self, cmd_name: str, cmd: Dict, cmd_ind: int): + def __init__(self, cmd_name: str, cmd: Dict, cmd_index: int): self.cmd_name = cmd_name self.cmd = cmd - self.cmd_ind = cmd_ind + self.cmd_index = cmd_index self.state = _ParallelCommandInfo.state_dict["pending"] self.pid: Optional[int] = None self.last_keepalive_time: Optional[int] = None - self.os_cmd_ind: Optional[int] = None + self.running_os_cmd_index: Optional[int] = None self.rc: Optional[int] = None self.console_output: Optional[str] = None @@ -115,16 +116,18 @@ class _ParallelCommandInfo: @property def state_repr(self) -> str: state_str = self.state.name - if state_str == "running" and self.os_cmd_ind is not None: + if state_str == "running" and self.running_os_cmd_index is not None: number_of_os_cmds = len(self.cmd["script"]) - state_str = f"{state_str} ({self.os_cmd_ind + 1}/{number_of_os_cmds})" + state_str = ( + f"{state_str} ({self.running_os_cmd_index + 1}/{number_of_os_cmds})" + ) elif state_str in ("failed", "terminated") and os.name == "nt": # not used at present as the status table isn't displayed on Windows, # but may be relevant in the future if Windows becomes able to handle # ANSI commands as standard state_str = "failed/terminated" - # we know we have ANSI commands because otherwise - # the status table would not be displayed in the first place + # we know ANSI commands are available because otherwise + # the status table would not be being displayed in the first place return color(state_str, self.state.display_color) @@ -148,12 +151,12 @@ def project_run_parallel_group( """ config = load_project_config(project_dir, overrides=overrides) commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} + check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION) parallel_group_status_queue = Manager().Queue() max_parallel_processes = config.get("max_parallel_processes") - check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION) cmd_infos = [ - _ParallelCommandInfo(cmd_name, commands[cmd_name], cmd_ind) - for cmd_ind, cmd_name in enumerate(cmd_names) + _ParallelCommandInfo(cmd_name, commands[cmd_name], cmd_index) + for cmd_index, cmd_name in enumerate(cmd_names) ] with working_dir(project_dir) as current_dir: @@ -202,7 +205,8 @@ def project_run_parallel_group( divider_parallel_descriptor[: (MAX_WIDTH_DIVIDER - 3)] + "..." ) msg.divider(divider_parallel_descriptor) - if not DISPLAY_STATUS_TABLE: + if not DISPLAY_STATUS_TABLE and len(parallel_descriptor) > MAX_WIDTH_DIVIDER: + # reprint the descriptor if it was too long and had to be cut short print(parallel_descriptor) msg.info( @@ -228,7 +232,7 @@ def project_run_parallel_group( c for c in cmd_infos if c.state.name == "pending" ): other_cmd_info.change_state("cancelled") - cmd_info = cmd_infos[cast(int, mess["cmd_ind"])] + cmd_info = cmd_infos[cast(int, mess["cmd_index"])] if mess["type"] in ("started", "keepalive"): cmd_info.last_keepalive_time = int(time()) for other_cmd_info in ( @@ -243,7 +247,7 @@ def project_run_parallel_group( other_cmd_info.change_state("hung") if mess["type"] == "started": cmd_info.change_state("running") - cmd_info.os_cmd_ind = cast(int, mess["os_cmd_ind"]) + cmd_info.running_os_cmd_index = cast(int, mess["os_cmd_index"]) cmd_info.pid = cast(int, mess["pid"]) if mess["type"] == "completed": cmd_info.rc = cast(int, mess["rc"]) @@ -269,6 +273,8 @@ def project_run_parallel_group( try: os.kill(cast(int, other_cmd_info.pid), SIGTERM) except: + # there could be a race condition between the process completing + # and the message arriving to notify about it pass for other_cmd_info in ( c for c in cmd_infos if c.state.name == "pending" @@ -320,7 +326,7 @@ def _project_run_parallel_cmd( file_not_found = False # buffering=0: make sure output is not lost if a subprocess is terminated with open(log_file_name, "wb", buffering=0) as logfile: - for os_cmd_ind, os_cmd in enumerate(cmd_info.cmd["script"]): + for os_cmd_index, os_cmd in enumerate(cmd_info.cmd["script"]): command = split_command(os_cmd) if len(command) and command[0] in ("python", "python3"): # -u: prevent buffering within Python @@ -359,8 +365,8 @@ def _project_run_parallel_cmd( parallel_group_status_queue.put( { "type": "started", - "cmd_ind": cmd_info.cmd_ind, - "os_cmd_ind": os_cmd_ind, + "cmd_index": cmd_info.cmd_index, + "os_cmd_index": os_cmd_index, "pid": sp.pid, } ) @@ -373,7 +379,7 @@ def _project_run_parallel_cmd( parallel_group_status_queue.put( { "type": "keepalive", - "cmd_ind": cmd_info.cmd_ind, + "cmd_index": cmd_info.cmd_index, } ) else: @@ -398,8 +404,8 @@ def _project_run_parallel_cmd( parallel_group_status_queue.put( { "type": "started", - "cmd_ind": cmd_info.cmd_ind, - "os_cmd_ind": os_cmd_ind, + "cmd_index": cmd_info.cmd_index, + "os_cmd_index": os_cmd_index, "pid": -1, } ) @@ -414,7 +420,7 @@ def _project_run_parallel_cmd( parallel_group_status_queue.put( { "type": "completed", - "cmd_ind": cmd_info.cmd_ind, + "cmd_index": cmd_info.cmd_index, "rc": rc, "console_output": logfile.read(), } diff --git a/spacy/cli/project/run.py b/spacy/cli/project/run.py index 359c71979..31d921067 100644 --- a/spacy/cli/project/run.py +++ b/spacy/cli/project/run.py @@ -165,7 +165,7 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: ] else: steps_data = [ - (f"{i + 1}.", f"{step[1]}", commands[step[1]].get("help", "")) + (f"{i + 1}. {step[1]}", commands[step[1]].get("help", "")) for i, step in enumerate(steps) ] msg.table(steps_data)