Seems to work, not yet tested in a structured way

This commit is contained in:
richardpaulhudson 2022-07-18 16:40:18 +02:00
parent 3c64e825d0
commit ca403f8b02

View File

@ -1,14 +1,16 @@
import multiprocessing import multiprocessing
import sys
from typing import Optional, List, Dict, Sequence, Any, Iterable, Literal, Tuple, Union from typing import Optional, List, Dict, Sequence, Any, Iterable, Literal, Tuple, Union
from time import time from time import time
from os import kill, environ from os import kill, environ, linesep, mkdir, sep
from subprocess import Popen from shutil import rmtree
from tempfile import TemporaryFile from signal import SIGTERM
from subprocess import STDOUT, Popen, TimeoutExpired
from time import sleep
from pathlib import Path from pathlib import Path
from multiprocessing import Process, Queue from multiprocessing import Process, Manager as MultiprocessingManager, Queue
from wasabi import msg, Printer from wasabi import msg
from wasabi.util import locale_escape, supports_ansi from wasabi.util import locale_escape, supports_ansi, color
import sys
import srsly import srsly
import typer import typer
@ -49,17 +51,48 @@ def project_run_cli(
project_run(project_dir, subcommand, overrides=overrides, force=force, dry=dry) project_run(project_dir, subcommand, overrides=overrides, force=force, dry=dry)
DISPLAY_STATUS_COLORS = {
"pending": "yellow",
"not rerun": "blue",
"running": "green",
"succeeded": "green",
"failed": "red",
"killed": "red",
"hung": "red",
"cancelled": "red",
}
class MultiprocessingCommandInfo: class MultiprocessingCommandInfo:
def __init__(self, cmd_name: str, cmd: str): def __init__(self, cmd_name: str, cmd: str, cmd_ind: int):
self.cmd_name = cmd_name self.cmd_name = cmd_name
self.cmd = cmd self.cmd = cmd
self.cmd_ind = cmd_ind
self.len_os_cmds = len(cmd["script"]) self.len_os_cmds = len(cmd["script"])
self.status: Literal[ self.status: Literal[
"pending", "not rerun", "running", "hung", "succeeded", "failed", "killed" "pending",
"not rerun",
"running",
"hung",
"succeeded",
"failed",
"killed",
"cancelled",
] = "pending" ] = "pending"
self.pid: Optional[int] = None self.pid: Optional[int] = None
self.last_status_time: Optional[int] = None self.last_status_time: Optional[int] = None
self.os_cmd_index: Optional[int] = None self.os_cmd_ind: Optional[int] = None
self.rc: Optional[int] = None
self.output: Optional[str] = None
@property
def disp_status(self) -> str:
status = self.status
status_color = DISPLAY_STATUS_COLORS[status]
if status == "running":
status = f"{status} ({self.os_cmd_ind + 1}/{self.len_os_cmds})"
return color(status, status_color)
def project_run( def project_run(
@ -110,7 +143,7 @@ def project_run(
assert isinstance(cmds[0], str) assert isinstance(cmds[0], str)
project_run_mult_group( project_run_mult_group(
project_dir, project_dir,
[MultiprocessingCommandInfo(cmd, commands[cmd]) for cmd in cmds], [MultiprocessingCommandInfo(cmd, commands[cmd], cmd_ind) for cmd_ind, cmd in enumerate(cmds)],
overrides=overrides, overrides=overrides,
force=force, force=force,
) )
@ -143,7 +176,7 @@ def project_run(
MULTIPROCESSING_GROUP_STATUS_INTERVAL = 1 MULTIPROCESSING_GROUP_STATUS_INTERVAL = 1
MULTIPROCESSING_GROUP_LIVENESS_INTERVAL = 5 MULTIPROCESSING_LOGGING_DIR_NAME = "parrTmp"
def project_run_mult_group( def project_run_mult_group(
@ -155,10 +188,10 @@ def project_run_mult_group(
dry: bool = False, dry: bool = False,
) -> None: ) -> None:
config = load_project_config(project_dir, overrides=overrides) config = load_project_config(project_dir, overrides=overrides)
mult_group_status_queue = Queue() mult_group_status_queue = MultiprocessingManager().Queue()
max_parallel_processes = config.get("max_parallel_processes") max_parallel_processes = config.get("max_parallel_processes")
check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION) check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION)
multiprocessing.set_start_method("spawn") multiprocessing.set_start_method("spawn", force=True)
DISPLAY_STATUS = sys.stdout.isatty() and supports_ansi() DISPLAY_STATUS = sys.stdout.isatty() and supports_ansi()
with working_dir(project_dir) as current_dir: with working_dir(project_dir) as current_dir:
for cmd_info in cmd_infos: for cmd_info in cmd_infos:
@ -168,23 +201,26 @@ def project_run_mult_group(
err_help = "Maybe you forgot to run the 'project assets' command or a previous step?" err_help = "Maybe you forgot to run the 'project assets' command or a previous step?"
err_kwargs = {"exits": 1} if not dry else {} err_kwargs = {"exits": 1} if not dry else {}
msg.fail(err, err_help, **err_kwargs) msg.fail(err, err_help, **err_kwargs)
if not check_rerun( if (
not check_rerun(
current_dir, cmd_info.cmd, check_spacy_commit=check_spacy_commit current_dir, cmd_info.cmd, check_spacy_commit=check_spacy_commit
)
and not force
): ):
cmd_info.status = "not rerun" cmd_info.status = "not rerun"
rmtree(MULTIPROCESSING_LOGGING_DIR_NAME, ignore_errors=True)
mkdir(MULTIPROCESSING_LOGGING_DIR_NAME)
processes = [ processes = [
Process( Process(
target=project_run_mult_command, target=project_run_mult_cmd,
args=(cmd_info,), args=(cmd_info,),
kwargs={ kwargs={
"dry": dry, "dry": dry,
"cmd_ind": cmd_ind, "current_dir": str(current_dir),
"mult_group_status_queue": mult_group_status_queue, "mult_group_status_queue": mult_group_status_queue,
}, },
) )
for cmd_ind, cmd_info in enumerate( for cmd_info in (c for c in cmd_infos if c.status != "not rerun")
c for c in cmd_infos if c.status != "not rerun"
)
] ]
num_processes = len(processes) num_processes = len(processes)
if ( if (
@ -195,87 +231,115 @@ def project_run_mult_group(
process_iterator = iter(processes) process_iterator = iter(processes)
for _ in range(num_processes): for _ in range(num_processes):
next(process_iterator).start() next(process_iterator).start()
msg.divider("MULTIPROCESSING GROUP") msg.divider("PARALLEL GROUP")
if not DISPLAY_STATUS: if not DISPLAY_STATUS:
print( print(
"parallel[" + ", ".join(cmd_info.name for cmd_info in cmd_infos) + "]" "parallel["
+ ", ".join(cmd_info.cmd_name for cmd_info in cmd_infos)
+ "]"
) )
first = True first = True
while any(cmd_info.status in ("pending", "running") for cmd_info in cmd_infos): while any(cmd_info.status in ("pending", "running") for cmd_info in cmd_infos):
try: try:
mess: Dict[str, Union[str, int]] = mult_group_status_queue.get( mess: Dict[str, Union[str, int]] = mult_group_status_queue.get(
MULTIPROCESSING_GROUP_LIVENESS_INTERVAL timeout=MULTIPROCESSING_GROUP_STATUS_INTERVAL * 20
) )
except mult_group_status_queue.Empty: except Exception as e:
for other_cmd_info in (c for c in cmd_infos if c.status == "running"): for other_cmd_info in (c for c in cmd_infos if c.status == "running"):
other_cmd_info.status = "hung" other_cmd_info.status = "hung"
cmd_info = cmd_infos[mess["cmd_ind"]] cmd_info = cmd_infos[mess["cmd_ind"]]
if mess["status"] in ("start", "alive"): if mess["status"] in ("started", "alive"):
cmd_info.last_status_time = int(time()) cmd_info.last_status_time = int(time())
for other_cmd_info in (c for c in cmd_infos if c.status == "running"): for other_cmd_info in (c for c in cmd_infos if c.status == "running"):
if ( if (
other_cmd_info.last_status_time is not None other_cmd_info.last_status_time is not None
and time() - other_cmd_info.last_status_time and time() - other_cmd_info.last_status_time
> MULTIPROCESSING_GROUP_LIVENESS_INTERVAL > MULTIPROCESSING_GROUP_STATUS_INTERVAL * 20
): ):
other_cmd_info.status = "hung" other_cmd_info.status = "hung"
if mess["status"] == "start": if mess["status"] == "started":
cmd_info.status = "running" cmd_info.status = "running"
cmd_info.os_cmd_index = mess["os_cmd_index"] cmd_info.os_cmd_ind = mess["os_cmd_ind"]
elif mess["status"] == "succeeded": cmd_info.pid = mess["pid"]
if mess["status"] == "completed":
cmd_info.rc = mess["rc"]
if cmd_info.rc == 0:
cmd_info.status = "succeeded" cmd_info.status = "succeeded"
if not dry: if not dry:
update_lockfile(current_dir, cmd_info.cmd) update_lockfile(current_dir, cmd_info.cmd)
next_process = next(process_iterator, None) next_process = next(process_iterator, None)
if next_process is not None: if next_process is not None:
next_process.start() next_process.start()
elif mess[1] == "failed": elif cmd_info.rc > 0:
cmd_info.status = "failed" cmd_info.status = "failed"
else:
cmd_info.status = "killed"
cmd_info.output = mess["output"]
if any(c for c in cmd_infos if c.status in ("failed", "hung")): if any(c for c in cmd_infos if c.status in ("failed", "hung")):
for other_cmd_info in (c for c in cmd_infos if c.status == "running"): for other_cmd_info in (c for c in cmd_infos if c.status == "running"):
kill(other_cmd_info.pid, -15) try:
if mess["status"] in ("succeeded", "failed", "killed"): kill(other_cmd_info.pid, SIGTERM)
cmd_info.output = mess["output"] except:
pass
for other_cmd_info in (c for c in cmd_infos if c.status == "pending"):
other_cmd_info.status = "cancelled"
if mess["status"] != "alive" and DISPLAY_STATUS: if mess["status"] != "alive" and DISPLAY_STATUS:
if first: if first:
first = False first = False
else: else:
print("\033[F" * (2 + len(cmd_infos))) print("\033[2K\033[F" * (4 + len(cmd_infos)))
data = [ data = [[c.cmd_name, c.disp_status] for c in cmd_infos]
[c.cmd_name, c.status, f"{c.os_cmd_ind}/{c.len_os_cmds}"] header = ["Command", "Status"]
for c in cmd_infos
]
header = ["Command", "Status", "OS Command"]
msg.table(data, header=header) msg.table(data, header=header)
for cmd_info in cmd_infos: for cmd_info in (c for c in cmd_infos if c.status != "cancelled"):
msg.divider(cmd_info.cmd_name)
if cmd_info.status == "not rerun":
msg.info(f"Skipping '{cmd_info.cmd_name}': nothing changed")
else:
print(cmd_info.output) print(cmd_info.output)
group_rc = max(c.rc for c in cmd_infos) process_rcs = [c.rc for c in cmd_infos if c.rc is not None]
if len(process_rcs) > 0:
group_rc = max(c.rc for c in cmd_infos if c.rc is not None)
if group_rc <= 0: if group_rc <= 0:
group_rc = min(c.rc for c in cmd_infos) group_rc = min(c.rc for c in cmd_infos if c.rc is not None)
if group_rc != 0: if group_rc != 0:
sys.exit(group_rc) sys.exit(group_rc)
if any(c for c in cmd_infos if c.status == "hung"):
sys.exit(-1)
def project_run_mult_command( def project_run_mult_cmd(
cmd_info: MultiprocessingCommandInfo, cmd_info: MultiprocessingCommandInfo,
*dry: bool, *,
cmd_ind: int, dry: bool,
current_dir: str,
mult_group_status_queue: Queue, mult_group_status_queue: Queue,
) -> None: ) -> None:
printer = Printer(no_print=True) log_file_name = sep.join(
with TemporaryFile() as logfile: (current_dir, MULTIPROCESSING_LOGGING_DIR_NAME, cmd_info.cmd_name + ".log")
print(printer.divider(cmd_info.cmd_name), logfile) )
with open(log_file_name, "wb", buffering=0) as logfile:
for os_cmd_ind, os_cmd in enumerate(cmd_info.cmd["script"]): for os_cmd_ind, os_cmd in enumerate(cmd_info.cmd["script"]):
command = split_command(os_cmd) command = split_command(os_cmd)
if len(command) and command[0] in ("python", "python3"): if len(command) and command[0] in ("python", "python3"):
command = [sys.executable, "-u", *command[1:]] command = [sys.executable, "-u", *command[1:]]
elif len(command) and command[0] in ("pip", "pip3"): elif len(command) and command[0] in ("pip", "pip3"):
command = [sys.executable, "-m", "pip", *command[1:]] command = [sys.executable, "-m", "pip", *command[1:]]
print(f"Running command: {join_command(command)}", logfile) logfile.write(
bytes(
f"Running command: {join_command(command)}" + linesep,
encoding="UTF-8",
)
)
sleep(2)
if not dry: if not dry:
try: try:
sp = Popen( sp = Popen(
command, stdout=logfile, env=environ.copy(), encoding="utf8" command,
stdout=logfile,
stderr=STDOUT,
env=environ.copy(),
encoding="utf8",
) )
except FileNotFoundError: except FileNotFoundError:
# Indicates the *command* wasn't found, it's an error before the command # Indicates the *command* wasn't found, it's an error before the command
@ -287,33 +351,50 @@ def project_run_mult_command(
) from None ) from None
mult_group_status_queue.put( mult_group_status_queue.put(
{ {
"cmd_ind": cmd_ind, "cmd_ind": cmd_info.cmd_ind,
"os_cmd_ind": os_cmd_ind, "os_cmd_ind": os_cmd_ind,
"status": "start", "status": "started",
"pid": sp.pid, "pid": sp.pid,
} }
) )
while True: while True:
sp.communicate(MULTIPROCESSING_GROUP_STATUS_INTERVAL) try:
rc = sp.poll() sp.communicate(timeout=MULTIPROCESSING_GROUP_STATUS_INTERVAL)
if rc == None: except TimeoutExpired:
pass
if sp.returncode == None:
mult_group_status_queue.put( mult_group_status_queue.put(
{ {
"cmd_ind": cmd_ind, "cmd_ind": cmd_info.cmd_ind,
"status": "alive", "status": "alive",
} }
) )
else: else:
break break
if rc != 0: if sp.returncode != 0:
status = "failed" if rc > 0 else "killed" if sp.returncode > 0:
print(printer.divider(status.upper() + f" rc={rc}"), logfile) logfile.write(
mult_group_status_queue.put( bytes(
{"cmd_ind": cmd_ind, "status": status, "output": logfile.read()} linesep + f"Failed (rc={sp.returncode})" + linesep,
encoding="UTF-8",
)
)
else:
logfile.write(
bytes(
linesep + f"Killed (rc={sp.returncode})" + linesep,
encoding="UTF-8",
)
) )
break break
with open(log_file_name, "r") as logfile:
mult_group_status_queue.put( mult_group_status_queue.put(
{"cmd_ind": cmd_ind, "status": "succeeded", "output": logfile.read()} {
"cmd_ind": cmd_info.cmd_ind,
"status": "completed",
"rc": sp.returncode,
"output": logfile.read(),
}
) )