Initial changes based on review comments

This commit is contained in:
richardpaulhudson 2022-07-22 22:38:42 +02:00
parent 1650912bc1
commit fd8dbfd5b5
3 changed files with 50 additions and 55 deletions

View File

@ -105,7 +105,6 @@ def update_dvc_config(
dvc_config_path.unlink()
dvc_commands = []
config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
processed_step = False
for name in workflows[workflow]:
if isinstance(name, dict) and "parallel" in name:
msg.fail(
@ -118,7 +117,6 @@ def update_dvc_config(
outputs_no_cache = command.get("outputs_no_cache", [])
if not deps and not outputs and not outputs_no_cache:
continue
processed_step = True
# Default to the working dir as the project path since dvc.yaml is auto-generated
# and we don't want arbitrary paths in there
project_cmd = ["python", "-m", NAME, "project", "run", name]
@ -130,7 +128,7 @@ def update_dvc_config(
dvc_cmd.append("--always-changed")
full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd]
dvc_commands.append(join_command(full_cmd))
if not processed_step:
if len(dvc_commands) == 0:
msg.fail(
f"A DVC workflow must have at least one dependency or output",
exits=1,

View File

@ -8,6 +8,7 @@ from multiprocessing.context import SpawnProcess
from shutil import rmtree
from signal import SIGTERM
from subprocess import STDOUT, Popen, TimeoutExpired
from dataclasses import dataclass, field
from wasabi import msg
from wasabi.util import color, supports_ansi
@ -48,54 +49,47 @@ MAX_WIDTH_DIVIDER = 60
DISPLAY_STATUS_TABLE = sys.stdout.isatty() and supports_ansi()
@dataclass
class _ParallelCommandState:
def __init__(
self,
name: str,
display_color: str,
# transitions: the names of states that can legally follow this state
transitions: Optional[List[str]] = None,
):
self.name = name
self.display_color = display_color
if transitions is not None:
self.transitions = transitions
else:
self.transitions = []
_PARALLEL_COMMAND_STATES = (
# The command has not yet been run
_ParallelCommandState("pending", "yellow", ["starting", "cancelled", "not rerun"]),
# The command was not run because its settings, inputs and outputs had not changed
# since a previous successful run
_ParallelCommandState("not rerun", "blue"),
# The main process has spawned a worker process for the command but not yet received
# an acknowledgement
_ParallelCommandState("starting", "green", ["running", "failed", "hung"]),
# The command is running
_ParallelCommandState(
"running", "green", ["running", "succeeded", "failed", "terminated", "hung"]
),
# The command succeeded (rc=0)
_ParallelCommandState("succeeded", "green"),
# The command failed (rc>0)
_ParallelCommandState("failed", "red"),
# The command was terminated (rc<0), usually but not necessarily by the main process because
# another command within the same parallel group failed
_ParallelCommandState("terminated", "red"),
# The main process would expect the worker process to be running, but the worker process has
# not communicated with the main process for an extended period of time
_ParallelCommandState("hung", "red"),
# The main process did not attempt to execute the command because another command in the same
# parallel group had already failed
_ParallelCommandState("cancelled", "red"),
)
name: str
display_color: str
# transitions: the names of states that can legally follow this state
transitions: List = field(default_factory=list)
class _ParallelCommandInfo:
state_dict = {state.name: state for state in _PARALLEL_COMMAND_STATES}
STATES = (
# The command has not yet been run
_ParallelCommandState(
"pending", "yellow", ["starting", "cancelled", "not rerun"]
),
# The command was not run because its settings, inputs and outputs had not changed
# since a previous successful run
_ParallelCommandState("not rerun", "blue"),
# The main process has spawned a worker process for the command but not yet received
# an acknowledgement
_ParallelCommandState("starting", "green", ["running", "failed", "hung"]),
# The command is running
_ParallelCommandState(
"running", "green", ["running", "succeeded", "failed", "terminated", "hung"]
),
# The command succeeded (rc=0)
_ParallelCommandState("succeeded", "green"),
# The command failed (rc>0)
_ParallelCommandState("failed", "red"),
# The command was terminated (rc<0), usually but not necessarily by the main process because
# another command within the same parallel group failed
_ParallelCommandState("terminated", "red"),
# The main process would expect the worker process to be running, but the worker process has
# not communicated with the main process for an extended period of time
_ParallelCommandState("hung", "red"),
# The main process did not attempt to execute the command because another command in the same
# parallel group had already failed
_ParallelCommandState("cancelled", "red"),
)
state_dict = {state.name: state for state in STATES}
def __init__(self, cmd_name: str, cmd: Dict, cmd_index: int):
self.cmd_name = cmd_name
@ -109,9 +103,10 @@ class _ParallelCommandInfo:
self.console_output: Optional[str] = None
def change_state(self, new_state: str):
assert new_state in self.state.transitions, (
"Illegal transition from " + self.state.name + " to " + new_state + "."
)
if new_state not in self.state.transitions:
raise RuntimeError(
Errors.E1037.format(old_state=self.state.name, new_state=new_state)
)
self.state = _ParallelCommandInfo.state_dict[new_state]
@property
@ -211,7 +206,7 @@ def project_run_parallel_group(
"Temporary logs are being written to "
+ os.sep.join((str(current_dir), PARALLEL_LOGGING_DIR_NAME))
)
first = True
status_table_not_yet_displayed = True
while any(
cmd_info.state.name in ("pending", "starting", "running")
for cmd_info in cmd_infos
@ -270,18 +265,19 @@ def project_run_parallel_group(
c for c in cmd_infos if c.state.name == "running"
):
try:
os.kill(cast(int, other_cmd_info.pid), SIGTERM)
os.kill(other_cmd_info.pid, SIGTERM)
except:
# there could be a race condition between the process completing
# and the message arriving to notify about it
# the subprocess the main process is trying to kill could already
# have completed, and the message from the worker process notifying
# the main process about this could still be in the queue
pass
for other_cmd_info in (
c for c in cmd_infos if c.state.name == "pending"
):
other_cmd_info.change_state("cancelled")
if mess["type"] != "keepalive" and DISPLAY_STATUS_TABLE:
if first:
first = False
if status_table_not_yet_displayed:
status_table_not_yet_displayed = False
else:
# overwrite the existing status table
print("\033[2K\033[F" * (4 + len(cmd_infos)))

View File

@ -913,6 +913,7 @@ class Errors(metaclass=ErrorsWithCodes):
E1034 = ("Node index {i} out of bounds ({length})")
E1035 = ("Token index {i} out of bounds ({length})")
E1036 = ("Cannot index into NoneNode")
E1037 = ("Illegal transition from {old_state} to {new_state}.")
# Deprecated model shortcuts, only used in errors and warnings