From ca403f8b026ff5386d7ce36297f37358647c6441 Mon Sep 17 00:00:00 2001 From: richardpaulhudson Date: Mon, 18 Jul 2022 16:40:18 +0200 Subject: [PATCH] Seems to work, not yet tested in a structured way --- spacy/cli/project/run.py | 235 ++++++++++++++++++++++++++------------- 1 file changed, 158 insertions(+), 77 deletions(-) diff --git a/spacy/cli/project/run.py b/spacy/cli/project/run.py index 2e2535976..d9698e1de 100644 --- a/spacy/cli/project/run.py +++ b/spacy/cli/project/run.py @@ -1,14 +1,16 @@ import multiprocessing +import sys from typing import Optional, List, Dict, Sequence, Any, Iterable, Literal, Tuple, Union from time import time -from os import kill, environ -from subprocess import Popen -from tempfile import TemporaryFile +from os import kill, environ, linesep, mkdir, sep +from shutil import rmtree +from signal import SIGTERM +from subprocess import STDOUT, Popen, TimeoutExpired +from time import sleep from pathlib import Path -from multiprocessing import Process, Queue -from wasabi import msg, Printer -from wasabi.util import locale_escape, supports_ansi -import sys +from multiprocessing import Process, Manager as MultiprocessingManager, Queue +from wasabi import msg +from wasabi.util import locale_escape, supports_ansi, color import srsly import typer @@ -49,17 +51,48 @@ def project_run_cli( project_run(project_dir, subcommand, overrides=overrides, force=force, dry=dry) +DISPLAY_STATUS_COLORS = { + "pending": "yellow", + "not rerun": "blue", + "running": "green", + "succeeded": "green", + "failed": "red", + "killed": "red", + "hung": "red", + "cancelled": "red", +} + + class MultiprocessingCommandInfo: - def __init__(self, cmd_name: str, cmd: str): + def __init__(self, cmd_name: str, cmd: str, cmd_ind: int): self.cmd_name = cmd_name self.cmd = cmd + self.cmd_ind = cmd_ind self.len_os_cmds = len(cmd["script"]) self.status: Literal[ - "pending", "not rerun", "running", "hung", "succeeded", "failed", "killed" + "pending", + "not rerun", + "running", + "hung", + "succeeded", + "failed", + "killed", + "cancelled", ] = "pending" self.pid: Optional[int] = None self.last_status_time: Optional[int] = None - self.os_cmd_index: Optional[int] = None + self.os_cmd_ind: Optional[int] = None + self.rc: Optional[int] = None + self.output: Optional[str] = None + + + @property + def disp_status(self) -> str: + status = self.status + status_color = DISPLAY_STATUS_COLORS[status] + if status == "running": + status = f"{status} ({self.os_cmd_ind + 1}/{self.len_os_cmds})" + return color(status, status_color) def project_run( @@ -110,7 +143,7 @@ def project_run( assert isinstance(cmds[0], str) project_run_mult_group( project_dir, - [MultiprocessingCommandInfo(cmd, commands[cmd]) for cmd in cmds], + [MultiprocessingCommandInfo(cmd, commands[cmd], cmd_ind) for cmd_ind, cmd in enumerate(cmds)], overrides=overrides, force=force, ) @@ -143,7 +176,7 @@ def project_run( MULTIPROCESSING_GROUP_STATUS_INTERVAL = 1 -MULTIPROCESSING_GROUP_LIVENESS_INTERVAL = 5 +MULTIPROCESSING_LOGGING_DIR_NAME = "parrTmp" def project_run_mult_group( @@ -155,10 +188,10 @@ def project_run_mult_group( dry: bool = False, ) -> None: config = load_project_config(project_dir, overrides=overrides) - mult_group_status_queue = Queue() + mult_group_status_queue = MultiprocessingManager().Queue() max_parallel_processes = config.get("max_parallel_processes") check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION) - multiprocessing.set_start_method("spawn") + multiprocessing.set_start_method("spawn", force=True) DISPLAY_STATUS = sys.stdout.isatty() and supports_ansi() with working_dir(project_dir) as current_dir: for cmd_info in cmd_infos: @@ -168,23 +201,26 @@ def project_run_mult_group( err_help = "Maybe you forgot to run the 'project assets' command or a previous step?" err_kwargs = {"exits": 1} if not dry else {} msg.fail(err, err_help, **err_kwargs) - if not check_rerun( - current_dir, cmd_info.cmd, check_spacy_commit=check_spacy_commit + if ( + not check_rerun( + current_dir, cmd_info.cmd, check_spacy_commit=check_spacy_commit + ) + and not force ): cmd_info.status = "not rerun" + rmtree(MULTIPROCESSING_LOGGING_DIR_NAME, ignore_errors=True) + mkdir(MULTIPROCESSING_LOGGING_DIR_NAME) processes = [ Process( - target=project_run_mult_command, + target=project_run_mult_cmd, args=(cmd_info,), kwargs={ "dry": dry, - "cmd_ind": cmd_ind, + "current_dir": str(current_dir), "mult_group_status_queue": mult_group_status_queue, }, ) - for cmd_ind, cmd_info in enumerate( - c for c in cmd_infos if c.status != "not rerun" - ) + for cmd_info in (c for c in cmd_infos if c.status != "not rerun") ] num_processes = len(processes) if ( @@ -195,87 +231,115 @@ def project_run_mult_group( process_iterator = iter(processes) for _ in range(num_processes): next(process_iterator).start() - msg.divider("MULTIPROCESSING GROUP") + msg.divider("PARALLEL GROUP") if not DISPLAY_STATUS: print( - "parallel[" + ", ".join(cmd_info.name for cmd_info in cmd_infos) + "]" + "parallel[" + + ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos) + + "]" ) first = True while any(cmd_info.status in ("pending", "running") for cmd_info in cmd_infos): try: mess: Dict[str, Union[str, int]] = mult_group_status_queue.get( - MULTIPROCESSING_GROUP_LIVENESS_INTERVAL + timeout=MULTIPROCESSING_GROUP_STATUS_INTERVAL * 20 ) - except mult_group_status_queue.Empty: + except Exception as e: for other_cmd_info in (c for c in cmd_infos if c.status == "running"): other_cmd_info.status = "hung" cmd_info = cmd_infos[mess["cmd_ind"]] - if mess["status"] in ("start", "alive"): + if mess["status"] in ("started", "alive"): cmd_info.last_status_time = int(time()) for other_cmd_info in (c for c in cmd_infos if c.status == "running"): if ( other_cmd_info.last_status_time is not None and time() - other_cmd_info.last_status_time - > MULTIPROCESSING_GROUP_LIVENESS_INTERVAL + > MULTIPROCESSING_GROUP_STATUS_INTERVAL * 20 ): other_cmd_info.status = "hung" - if mess["status"] == "start": + if mess["status"] == "started": cmd_info.status = "running" - cmd_info.os_cmd_index = mess["os_cmd_index"] - elif mess["status"] == "succeeded": - cmd_info.status = "succeeded" - if not dry: - update_lockfile(current_dir, cmd_info.cmd) - next_process = next(process_iterator, None) - if next_process is not None: - next_process.start() - elif mess[1] == "failed": - cmd_info.status = "failed" + cmd_info.os_cmd_ind = mess["os_cmd_ind"] + cmd_info.pid = mess["pid"] + if mess["status"] == "completed": + cmd_info.rc = mess["rc"] + if cmd_info.rc == 0: + cmd_info.status = "succeeded" + if not dry: + update_lockfile(current_dir, cmd_info.cmd) + next_process = next(process_iterator, None) + if next_process is not None: + next_process.start() + elif cmd_info.rc > 0: + cmd_info.status = "failed" + else: + cmd_info.status = "killed" + cmd_info.output = mess["output"] if any(c for c in cmd_infos if c.status in ("failed", "hung")): for other_cmd_info in (c for c in cmd_infos if c.status == "running"): - kill(other_cmd_info.pid, -15) - if mess["status"] in ("succeeded", "failed", "killed"): - cmd_info.output = mess["output"] + try: + kill(other_cmd_info.pid, SIGTERM) + except: + pass + for other_cmd_info in (c for c in cmd_infos if c.status == "pending"): + other_cmd_info.status = "cancelled" if mess["status"] != "alive" and DISPLAY_STATUS: if first: first = False else: - print("\033[F" * (2 + len(cmd_infos))) - data = [ - [c.cmd_name, c.status, f"{c.os_cmd_ind}/{c.len_os_cmds}"] - for c in cmd_infos - ] - header = ["Command", "Status", "OS Command"] + print("\033[2K\033[F" * (4 + len(cmd_infos))) + data = [[c.cmd_name, c.disp_status] for c in cmd_infos] + header = ["Command", "Status"] msg.table(data, header=header) - for cmd_info in cmd_infos: - print(cmd_info.output) - group_rc = max(c.rc for c in cmd_infos) - if group_rc <= 0: - group_rc = min(c.rc for c in cmd_infos) - if group_rc != 0: - sys.exit(group_rc) + for cmd_info in (c for c in cmd_infos if c.status != "cancelled"): + msg.divider(cmd_info.cmd_name) + if cmd_info.status == "not rerun": + msg.info(f"Skipping '{cmd_info.cmd_name}': nothing changed") + else: + print(cmd_info.output) + process_rcs = [c.rc for c in cmd_infos if c.rc is not None] + if len(process_rcs) > 0: + group_rc = max(c.rc for c in cmd_infos if c.rc is not None) + if group_rc <= 0: + group_rc = min(c.rc for c in cmd_infos if c.rc is not None) + if group_rc != 0: + sys.exit(group_rc) + if any(c for c in cmd_infos if c.status == "hung"): + sys.exit(-1) -def project_run_mult_command( +def project_run_mult_cmd( cmd_info: MultiprocessingCommandInfo, - *dry: bool, - cmd_ind: int, + *, + dry: bool, + current_dir: str, mult_group_status_queue: Queue, ) -> None: - printer = Printer(no_print=True) - with TemporaryFile() as logfile: - print(printer.divider(cmd_info.cmd_name), logfile) + log_file_name = sep.join( + (current_dir, MULTIPROCESSING_LOGGING_DIR_NAME, cmd_info.cmd_name + ".log") + ) + with open(log_file_name, "wb", buffering=0) as logfile: for os_cmd_ind, os_cmd in enumerate(cmd_info.cmd["script"]): command = split_command(os_cmd) if len(command) and command[0] in ("python", "python3"): command = [sys.executable, "-u", *command[1:]] elif len(command) and command[0] in ("pip", "pip3"): command = [sys.executable, "-m", "pip", *command[1:]] - print(f"Running command: {join_command(command)}", logfile) + logfile.write( + bytes( + f"Running command: {join_command(command)}" + linesep, + encoding="UTF-8", + ) + ) + sleep(2) if not dry: try: sp = Popen( - command, stdout=logfile, env=environ.copy(), encoding="utf8" + command, + stdout=logfile, + stderr=STDOUT, + env=environ.copy(), + encoding="utf8", ) except FileNotFoundError: # Indicates the *command* wasn't found, it's an error before the command @@ -287,34 +351,51 @@ def project_run_mult_command( ) from None mult_group_status_queue.put( { - "cmd_ind": cmd_ind, + "cmd_ind": cmd_info.cmd_ind, "os_cmd_ind": os_cmd_ind, - "status": "start", + "status": "started", "pid": sp.pid, } ) while True: - sp.communicate(MULTIPROCESSING_GROUP_STATUS_INTERVAL) - rc = sp.poll() - if rc == None: + try: + sp.communicate(timeout=MULTIPROCESSING_GROUP_STATUS_INTERVAL) + except TimeoutExpired: + pass + if sp.returncode == None: mult_group_status_queue.put( { - "cmd_ind": cmd_ind, + "cmd_ind": cmd_info.cmd_ind, "status": "alive", } ) else: break - if rc != 0: - status = "failed" if rc > 0 else "killed" - print(printer.divider(status.upper() + f" rc={rc}"), logfile) - mult_group_status_queue.put( - {"cmd_ind": cmd_ind, "status": status, "output": logfile.read()} - ) + if sp.returncode != 0: + if sp.returncode > 0: + logfile.write( + bytes( + linesep + f"Failed (rc={sp.returncode})" + linesep, + encoding="UTF-8", + ) + ) + else: + logfile.write( + bytes( + linesep + f"Killed (rc={sp.returncode})" + linesep, + encoding="UTF-8", + ) + ) break - mult_group_status_queue.put( - {"cmd_ind": cmd_ind, "status": "succeeded", "output": logfile.read()} - ) + with open(log_file_name, "r") as logfile: + mult_group_status_queue.put( + { + "cmd_ind": cmd_info.cmd_ind, + "status": "completed", + "rc": sp.returncode, + "output": logfile.read(), + } + ) def run_commands(