diff --git a/spacy/cli/project/document.py b/spacy/cli/project/document.py index b6a0ba20e..3c36d4cab 100644 --- a/spacy/cli/project/document.py +++ b/spacy/cli/project/document.py @@ -15,7 +15,8 @@ Commands are only re-run if their inputs have changed.""" INTRO_WORKFLOWS = f"""The following workflows are defined by the project. They can be executed using [`spacy project run [name]`]({DOCS_URL}/api/cli#project-run) and will run the specified commands in order. Commands grouped within square brackets -are run in parallel. Commands are only re-run if their inputs have changed.""" +preceded by 'parallel' are run in parallel. Commands are only re-run if their inputs +have changed.""" INTRO_ASSETS = f"""The following assets are defined by the project. They can be fetched by running [`spacy project assets`]({DOCS_URL}/api/cli#project-assets) in the project directory.""" @@ -77,7 +78,7 @@ def project_document( rendered_steps.append(md.code(step)) else: rendered_steps.append( - "[" + "parallel[" + ", ".join(md.code(p_step) for p_step in step["parallel"]) + "]" ) diff --git a/spacy/cli/project/parallel.py b/spacy/cli/project/parallel.py index ab206d9e5..0ebc24945 100644 --- a/spacy/cli/project/parallel.py +++ b/spacy/cli/project/parallel.py @@ -1,4 +1,4 @@ -from typing import Any, List, Literal, Optional, Dict, Union, cast, Tuple +from typing import Any, List, Optional, Dict, Union, cast import sys from pathlib import Path from time import time @@ -15,6 +15,19 @@ from ...util import SimpleFrozenDict, working_dir from ...util import check_bool_env_var, ENV_VARS, split_command, join_command from ...errors import Errors +""" +Permits the execution of a parallel command group. + +The main process starts a worker process for each spaCy projects command in the parallel group. +Each spaCy projects command consists of n OS level commands. The worker process starts +subprocesses for these OS level commands in order, and sends status information about their +execution back to the main process. + +The main process maintains a state machine for each spaCy projects command/worker process. The meaning +of the states is documented alongside the _PARALLEL_COMMAND_STATES code. +""" + +# Use spawn to create worker processes on all OSs for consistency set_start_method("spawn", force=True) # How often the worker processes managing the commands in a parallel group @@ -30,34 +43,48 @@ MAX_WIDTH_DIVIDER = 60 # Whether or not to display realtime status information DISPLAY_STATUS_TABLE = sys.stdout.isatty() and supports_ansi() - + class _ParallelCommandState: def __init__( self, name: str, display_color: str, - transitions: Optional[Tuple[str]] = None, + transitions: Optional[List[str]] = None, ): self.name = name self.display_color = display_color if transitions is not None: self.transitions = transitions else: - self.transitions = None + self.transitions = [] _PARALLEL_COMMAND_STATES = ( - _ParallelCommandState("pending", "yellow", ("starting", "cancelled", "not rerun")), + # The command has not yet been run + _ParallelCommandState("pending", "yellow", ["starting", "cancelled", "not rerun"]), + # The command was not run because its settings, inputs and outputs had not changed + # since a previous successful run _ParallelCommandState("not rerun", "blue"), - _ParallelCommandState("starting", "green", ("running", "hung")), + # The main process has spawned a worker process for the command but not yet received + # an acknowledgement + _ParallelCommandState("starting", "green", ["running", "hung"]), + # The command is running _ParallelCommandState( - "running", "green", ("running", "succeeded", "failed", "killed", "hung") + "running", "green", ["running", "succeeded", "failed", "terminated", "hung"] ), + # The command succeeded (rc=0) _ParallelCommandState("succeeded", "green"), + # The command failed (rc>0) _ParallelCommandState("failed", "red"), - _ParallelCommandState("killed", "red"), + # The command was terminated (rc<0), usually but not necessarily by the main process because + # another command within the same parallel group failed + _ParallelCommandState("terminated", "red"), + # The main process would expect the worker process to be running, but the worker process has + # not communicated with the main process for an extended period of time _ParallelCommandState("hung", "red"), + # The main process did not attempt to execute the command because another command in the same + # parallel group had already failed _ParallelCommandState("cancelled", "red"), ) @@ -70,10 +97,9 @@ class _ParallelCommandInfo: self.cmd_name = cmd_name self.cmd = cmd self.cmd_ind = cmd_ind - self.len_os_cmds = len(cmd["script"]) self.state = _ParallelCommandInfo.state_dict["pending"] self.pid: Optional[int] = None - self.last_status_time: Optional[int] = None + self.last_keepalive_time: Optional[int] = None self.os_cmd_ind: Optional[int] = None self.rc: Optional[int] = None self.output: Optional[str] = None @@ -85,10 +111,11 @@ class _ParallelCommandInfo: self.state = _ParallelCommandInfo.state_dict[new_state] @property - def disp_state(self) -> str: + def state_repr(self) -> str: state_str = self.state.name if state_str == "running" and self.os_cmd_ind is not None: - state_str = f"{state_str} ({self.os_cmd_ind + 1}/{self.len_os_cmds})" + number_of_os_cmds = len(self.cmd["script"]) + state_str = f"{state_str} ({self.os_cmd_ind + 1}/{number_of_os_cmds})" return color(state_str, self.state.display_color) @@ -102,7 +129,7 @@ def project_run_parallel_group( ) -> None: """Run a parallel group of commands. Note that because of the challenges of managing parallel output streams it is not possible to specify a value for 'capture'. Essentially, - with parallel groups 'capture==True' for stdout and 'capture==False' for stderr. + with parallel groups 'capture==True'. project_dir (Path): Path to project directory. cmd_infos: a list of objects containing information about he commands to run. @@ -162,13 +189,18 @@ def project_run_parallel_group( "parallel[" + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + "]" ) if len(divider_parallel_descriptor) > MAX_WIDTH_DIVIDER: - divider_parallel_descriptor = divider_parallel_descriptor[:(MAX_WIDTH_DIVIDER-3)] + "..." + divider_parallel_descriptor = ( + divider_parallel_descriptor[: (MAX_WIDTH_DIVIDER - 3)] + "..." + ) msg.divider(divider_parallel_descriptor) if not DISPLAY_STATUS_TABLE: print(parallel_descriptor) - else: - first = True + msg.info( + "Temporary logs are being written to " + + sep.join((str(current_dir), PARALLEL_LOGGING_DIR_NAME)) + ) + first = True while any( cmd_info.state.name in ("pending", "starting", "running") for cmd_info in cmd_infos @@ -177,9 +209,10 @@ def project_run_parallel_group( mess: Dict[str, Union[str, int]] = parallel_group_status_queue.get( timeout=PARALLEL_GROUP_STATUS_INTERVAL * 20 ) - except Exception as e: + except Exception: + # No more messages are being received: the whole group has hung for other_cmd_info in ( - c for c in cmd_infos if c.state.name == "running" + c for c in cmd_infos if c.state.name in ("starting", "running") ): other_cmd_info.change_state("hung") for other_cmd_info in ( @@ -187,22 +220,23 @@ def project_run_parallel_group( ): other_cmd_info.change_state("cancelled") cmd_info = cmd_infos[cast(int, mess["cmd_ind"])] - if mess["status"] in ("started", "alive"): - cmd_info.last_status_time = int(time()) + if mess["type"] in ("started", "keepalive"): + cmd_info.last_keepalive_time = int(time()) for other_cmd_info in ( c for c in cmd_infos if c.state.name in ("starting", "running") ): if ( - other_cmd_info.last_status_time is not None - and time() - other_cmd_info.last_status_time + other_cmd_info.last_keepalive_time is not None + and time() - other_cmd_info.last_keepalive_time > PARALLEL_GROUP_STATUS_INTERVAL * 20 ): + # a specific command has hung other_cmd_info.change_state("hung") - if mess["status"] == "started": + if mess["type"] == "started": cmd_info.change_state("running") cmd_info.os_cmd_ind = cast(int, mess["os_cmd_ind"]) cmd_info.pid = cast(int, mess["pid"]) - if mess["status"] == "completed": + if mess["type"] == "completed": cmd_info.rc = cast(int, mess["rc"]) if cmd_info.rc == 0: cmd_info.change_state("succeeded") @@ -214,11 +248,12 @@ def project_run_parallel_group( elif cmd_info.rc > 0: cmd_info.change_state("failed") else: - cmd_info.change_state("killed") + cmd_info.change_state("terminated") cmd_info.output = cast(str, mess["output"]) if any( - c for c in cmd_infos if c.state.name in ("failed", "killed", "hung") + c for c in cmd_infos if c.state.name in ("failed", "terminated", "hung") ): + # a command in the group hasn't succeeded, so terminate/cancel the rest for other_cmd_info in ( c for c in cmd_infos if c.state.name == "running" ): @@ -230,12 +265,13 @@ def project_run_parallel_group( c for c in cmd_infos if c.state.name == "pending" ): other_cmd_info.change_state("cancelled") - if mess["status"] != "alive" and DISPLAY_STATUS_TABLE: + if mess["type"] != "keepalive" and DISPLAY_STATUS_TABLE: if first: first = False else: + # overwrite the existing status table print("\033[2K\033[F" * (4 + len(cmd_infos))) - data = [[c.cmd_name, c.disp_state] for c in cmd_infos] + data = [[c.cmd_name, c.state_repr] for c in cmd_infos] header = ["Command", "Status"] msg.table(data, header=header) @@ -248,11 +284,9 @@ def project_run_parallel_group( process_rcs = [c.rc for c in cmd_infos if c.rc is not None] if len(process_rcs) > 0: group_rc = max(process_rcs) - if group_rc <= 0: - group_rc = min(process_rcs) - if group_rc != 0: + if group_rc > 0: sys.exit(group_rc) - if any(c for c in cmd_infos if c.state.name == "hung"): + if any(c for c in cmd_infos if c.state.name in ("hung", "terminated")): sys.exit(-1) @@ -263,21 +297,31 @@ def _project_run_parallel_cmd( current_dir: str, parallel_group_status_queue: Queue, ) -> None: + """Run a single spaCy projects command as a worker process. + + Communicates with the main process via queue messages that consist of + dictionaries whose type is determined by the entry 'type'. Possible + values of 'type' are 'started', 'completed' and 'keepalive'. Each dictionary + type contains different additional fields.""" + # we can use the command name as a unique log filename because a parallel + # group is not allowed to contain the same command more than once log_file_name = sep.join( (current_dir, PARALLEL_LOGGING_DIR_NAME, cmd_info.cmd_name + ".log") ) file_not_found = False + # buffering=0: make sure output is not lost if a subprocess is terminated with open(log_file_name, "wb", buffering=0) as logfile: for os_cmd_ind, os_cmd in enumerate(cmd_info.cmd["script"]): command = split_command(os_cmd) if len(command) and command[0] in ("python", "python3"): + # -u: prevent buffering within Python command = [sys.executable, "-u", *command[1:]] elif len(command) and command[0] in ("pip", "pip3"): command = [sys.executable, "-m", "pip", *command[1:]] logfile.write( bytes( f"Running command: {join_command(command)}" + linesep, - encoding="UTF-8", + encoding="utf8", ) ) if not dry: @@ -305,22 +349,22 @@ def _project_run_parallel_cmd( break parallel_group_status_queue.put( { + "type": "started", "cmd_ind": cmd_info.cmd_ind, "os_cmd_ind": os_cmd_ind, - "status": "started", "pid": sp.pid, } ) while True: try: - sp.communicate(timeout=PARALLEL_GROUP_STATUS_INTERVAL) + sp.wait(timeout=PARALLEL_GROUP_STATUS_INTERVAL) except TimeoutExpired: pass if sp.returncode == None: parallel_group_status_queue.put( { + "type": "keepalive", "cmd_ind": cmd_info.cmd_ind, - "status": "alive", } ) else: @@ -336,7 +380,7 @@ def _project_run_parallel_cmd( else: logfile.write( bytes( - linesep + f"Killed (rc={sp.returncode})" + linesep, + linesep + f"Terminated (rc={sp.returncode})" + linesep, encoding="UTF-8", ) ) @@ -350,8 +394,8 @@ def _project_run_parallel_cmd( rc = sp.returncode parallel_group_status_queue.put( { + "type": "completed", "cmd_ind": cmd_info.cmd_ind, - "status": "completed", "rc": rc, "output": logfile.read(), } @@ -364,5 +408,5 @@ def _start_process( cmd_info = proc_to_cmd_infos[process] if cmd_info.state.name == "pending": cmd_info.change_state("starting") - cmd_info.last_status_time = int(time()) + cmd_info.last_keepalive_time = int(time()) process.start()