diff --git a/spacy/cli/project/dvc.py b/spacy/cli/project/dvc.py index 404cb189b..7ea4d3d8c 100644 --- a/spacy/cli/project/dvc.py +++ b/spacy/cli/project/dvc.py @@ -105,7 +105,6 @@ def update_dvc_config( dvc_config_path.unlink() dvc_commands = [] config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} - processed_step = False for name in workflows[workflow]: if isinstance(name, dict) and "parallel" in name: msg.fail( @@ -118,7 +117,6 @@ def update_dvc_config( outputs_no_cache = command.get("outputs_no_cache", []) if not deps and not outputs and not outputs_no_cache: continue - processed_step = True # Default to the working dir as the project path since dvc.yaml is auto-generated # and we don't want arbitrary paths in there project_cmd = ["python", "-m", NAME, "project", "run", name] @@ -130,7 +128,7 @@ def update_dvc_config( dvc_cmd.append("--always-changed") full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd] dvc_commands.append(join_command(full_cmd)) - if not processed_step: + if len(dvc_commands) == 0: msg.fail( f"A DVC workflow must have at least one dependency or output", exits=1, diff --git a/spacy/cli/project/parallel.py b/spacy/cli/project/parallel.py index b06e48b59..62382a2aa 100644 --- a/spacy/cli/project/parallel.py +++ b/spacy/cli/project/parallel.py @@ -8,6 +8,7 @@ from multiprocessing.context import SpawnProcess from shutil import rmtree from signal import SIGTERM from subprocess import STDOUT, Popen, TimeoutExpired +from dataclasses import dataclass, field from wasabi import msg from wasabi.util import color, supports_ansi @@ -48,54 +49,47 @@ MAX_WIDTH_DIVIDER = 60 DISPLAY_STATUS_TABLE = sys.stdout.isatty() and supports_ansi() +@dataclass class _ParallelCommandState: - def __init__( - self, - name: str, - display_color: str, - # transitions: the names of states that can legally follow this state - transitions: Optional[List[str]] = None, - ): - self.name = name - self.display_color = display_color - if transitions is not None: - self.transitions = transitions - else: - self.transitions = [] - - -_PARALLEL_COMMAND_STATES = ( - # The command has not yet been run - _ParallelCommandState("pending", "yellow", ["starting", "cancelled", "not rerun"]), - # The command was not run because its settings, inputs and outputs had not changed - # since a previous successful run - _ParallelCommandState("not rerun", "blue"), - # The main process has spawned a worker process for the command but not yet received - # an acknowledgement - _ParallelCommandState("starting", "green", ["running", "failed", "hung"]), - # The command is running - _ParallelCommandState( - "running", "green", ["running", "succeeded", "failed", "terminated", "hung"] - ), - # The command succeeded (rc=0) - _ParallelCommandState("succeeded", "green"), - # The command failed (rc>0) - _ParallelCommandState("failed", "red"), - # The command was terminated (rc<0), usually but not necessarily by the main process because - # another command within the same parallel group failed - _ParallelCommandState("terminated", "red"), - # The main process would expect the worker process to be running, but the worker process has - # not communicated with the main process for an extended period of time - _ParallelCommandState("hung", "red"), - # The main process did not attempt to execute the command because another command in the same - # parallel group had already failed - _ParallelCommandState("cancelled", "red"), -) + name: str + display_color: str + # transitions: the names of states that can legally follow this state + transitions: List = field(default_factory=list) class _ParallelCommandInfo: - state_dict = {state.name: state for state in _PARALLEL_COMMAND_STATES} + STATES = ( + # The command has not yet been run + _ParallelCommandState( + "pending", "yellow", ["starting", "cancelled", "not rerun"] + ), + # The command was not run because its settings, inputs and outputs had not changed + # since a previous successful run + _ParallelCommandState("not rerun", "blue"), + # The main process has spawned a worker process for the command but not yet received + # an acknowledgement + _ParallelCommandState("starting", "green", ["running", "failed", "hung"]), + # The command is running + _ParallelCommandState( + "running", "green", ["running", "succeeded", "failed", "terminated", "hung"] + ), + # The command succeeded (rc=0) + _ParallelCommandState("succeeded", "green"), + # The command failed (rc>0) + _ParallelCommandState("failed", "red"), + # The command was terminated (rc<0), usually but not necessarily by the main process because + # another command within the same parallel group failed + _ParallelCommandState("terminated", "red"), + # The main process would expect the worker process to be running, but the worker process has + # not communicated with the main process for an extended period of time + _ParallelCommandState("hung", "red"), + # The main process did not attempt to execute the command because another command in the same + # parallel group had already failed + _ParallelCommandState("cancelled", "red"), + ) + + state_dict = {state.name: state for state in STATES} def __init__(self, cmd_name: str, cmd: Dict, cmd_index: int): self.cmd_name = cmd_name @@ -109,9 +103,10 @@ class _ParallelCommandInfo: self.console_output: Optional[str] = None def change_state(self, new_state: str): - assert new_state in self.state.transitions, ( - "Illegal transition from " + self.state.name + " to " + new_state + "." - ) + if new_state not in self.state.transitions: + raise RuntimeError( + Errors.E1037.format(old_state=self.state.name, new_state=new_state) + ) self.state = _ParallelCommandInfo.state_dict[new_state] @property @@ -211,7 +206,7 @@ def project_run_parallel_group( "Temporary logs are being written to " + os.sep.join((str(current_dir), PARALLEL_LOGGING_DIR_NAME)) ) - first = True + status_table_not_yet_displayed = True while any( cmd_info.state.name in ("pending", "starting", "running") for cmd_info in cmd_infos @@ -270,18 +265,19 @@ def project_run_parallel_group( c for c in cmd_infos if c.state.name == "running" ): try: - os.kill(cast(int, other_cmd_info.pid), SIGTERM) + os.kill(other_cmd_info.pid, SIGTERM) except: - # there could be a race condition between the process completing - # and the message arriving to notify about it + # the subprocess the main process is trying to kill could already + # have completed, and the message from the worker process notifying + # the main process about this could still be in the queue pass for other_cmd_info in ( c for c in cmd_infos if c.state.name == "pending" ): other_cmd_info.change_state("cancelled") if mess["type"] != "keepalive" and DISPLAY_STATUS_TABLE: - if first: - first = False + if status_table_not_yet_displayed: + status_table_not_yet_displayed = False else: # overwrite the existing status table print("\033[2K\033[F" * (4 + len(cmd_infos))) diff --git a/spacy/errors.py b/spacy/errors.py index b01afcb80..f8eb50101 100644 --- a/spacy/errors.py +++ b/spacy/errors.py @@ -913,6 +913,7 @@ class Errors(metaclass=ErrorsWithCodes): E1034 = ("Node index {i} out of bounds ({length})") E1035 = ("Token index {i} out of bounds ({length})") E1036 = ("Cannot index into NoneNode") + E1037 = ("Illegal transition from {old_state} to {new_state}.") # Deprecated model shortcuts, only used in errors and warnings