Changes based on review

This commit is contained in:
richardpaulhudson 2022-05-23 10:03:59 +02:00
parent ae825686aa
commit 4daffdd653
4 changed files with 40 additions and 36 deletions

View File

@ -242,13 +242,13 @@ def validate_project_commands(config: Dict[str, Any]) -> None:
if workflow_name in commands: if workflow_name in commands:
err = f"Can't use workflow name '{workflow_name}': name already exists as a command" err = f"Can't use workflow name '{workflow_name}': name already exists as a command"
msg.fail(err, exits=1) msg.fail(err, exits=1)
for step_or_list in workflow_items: for workflow_item in workflow_items:
if isinstance(step_or_list, str): if isinstance(workflow_item, str):
verify_workflow_step(workflow_name, commands, step_or_list) verify_workflow_step(workflow_name, commands, workflow_item)
else: else:
assert isinstance(step_or_list, list) assert isinstance(workflow_item, list)
assert isinstance(step_or_list[0], str) assert isinstance(workflow_item[0], str)
steps = cast(List[str], step_or_list) steps = cast(List[str], workflow_item)
if len(steps) < 2: if len(steps) < 2:
msg.fail( msg.fail(
f"Invalid multiprocessing group within '{workflow_name}'.", f"Invalid multiprocessing group within '{workflow_name}'.",
@ -580,3 +580,15 @@ def setup_gpu(use_gpu: int, silent=None) -> None:
local_msg.info("Using CPU") local_msg.info("Using CPU")
if has_cupy and gpu_is_available(): if has_cupy and gpu_is_available():
local_msg.info("To switch to GPU 0, use the option: --gpu-id 0") local_msg.info("To switch to GPU 0, use the option: --gpu-id 0")
def get_workflow_steps(workflow_items: List[Union[str, List[str]]]) -> List[str]:
steps: List[str] = []
for workflow_item in workflow_items:
if isinstance(workflow_item, str):
steps.append(workflow_item)
else:
assert isinstance(workflow_item, list)
assert isinstance(workflow_item[0], str)
steps.extend(workflow_item)
return steps

View File

@ -6,7 +6,7 @@ from pathlib import Path
from wasabi import msg from wasabi import msg
from .._util import PROJECT_FILE, load_project_config, get_hash, project_cli from .._util import PROJECT_FILE, load_project_config, get_hash, project_cli
from .._util import Arg, Opt, NAME, COMMAND from .._util import Arg, Opt, NAME, COMMAND, get_workflow_steps
from ...util import working_dir, split_command, join_command, run_command from ...util import working_dir, split_command, join_command, run_command
from ...util import SimpleFrozenList from ...util import SimpleFrozenList
@ -105,21 +105,15 @@ def update_dvc_config(
dvc_config_path.unlink() dvc_config_path.unlink()
dvc_commands = [] dvc_commands = []
config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
names = [] processed_step = False
for cmdOrMultiprocessingGroup in workflows[workflow]: for name in get_workflow_steps(workflows[workflow]):
if isinstance(cmdOrMultiprocessingGroup, str):
names.append(cmdOrMultiprocessingGroup)
else:
assert isinstance(cmdOrMultiprocessingGroup, list)
assert isinstance(cmdOrMultiprocessingGroup[0], str)
names.extend(cmdOrMultiprocessingGroup)
for name in names:
command = config_commands[name] command = config_commands[name]
deps = command.get("deps", []) deps = command.get("deps", [])
outputs = command.get("outputs", []) outputs = command.get("outputs", [])
outputs_no_cache = command.get("outputs_no_cache", []) outputs_no_cache = command.get("outputs_no_cache", [])
if not deps and not outputs and not outputs_no_cache: if not deps and not outputs and not outputs_no_cache:
continue continue
processed_step = True
# Default to the working dir as the project path since dvc.yaml is auto-generated # Default to the working dir as the project path since dvc.yaml is auto-generated
# and we don't want arbitrary paths in there # and we don't want arbitrary paths in there
project_cmd = ["python", "-m", NAME, "project", "run", name] project_cmd = ["python", "-m", NAME, "project", "run", name]
@ -131,6 +125,11 @@ def update_dvc_config(
dvc_cmd.append("--always-changed") dvc_cmd.append("--always-changed")
full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd] full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd]
dvc_commands.append(join_command(full_cmd)) dvc_commands.append(join_command(full_cmd))
if not processed_step:
msg.fail(
f"A DVC workflow must have at least one dependency or output",
exits=1,
)
with working_dir(path): with working_dir(path):
dvc_flags = {"--verbose": verbose, "--quiet": silent} dvc_flags = {"--verbose": verbose, "--quiet": silent}
run_dvc_commands(dvc_commands, flags=dvc_flags) run_dvc_commands(dvc_commands, flags=dvc_flags)

View File

@ -15,6 +15,7 @@ from ...util import SimpleFrozenList, is_minor_version_match, ENV_VARS
from ...util import check_bool_env_var, SimpleFrozenDict from ...util import check_bool_env_var, SimpleFrozenDict
from .._util import PROJECT_FILE, PROJECT_LOCK, load_project_config, get_hash from .._util import PROJECT_FILE, PROJECT_LOCK, load_project_config, get_hash
from .._util import get_checksum, project_cli, Arg, Opt, COMMAND, parse_config_overrides from .._util import get_checksum, project_cli, Arg, Opt, COMMAND, parse_config_overrides
from .._util import get_workflow_steps
@project_cli.command( @project_cli.command(
@ -78,11 +79,11 @@ def project_run(
validate_subcommand(list(commands.keys()), list(workflows.keys()), subcommand) validate_subcommand(list(commands.keys()), list(workflows.keys()), subcommand)
if subcommand in workflows: if subcommand in workflows:
msg.info(f"Running workflow '{subcommand}'") msg.info(f"Running workflow '{subcommand}'")
for cmdOrMultiprocessingGroup in workflows[subcommand]: for workflow_item in workflows[subcommand]:
if isinstance(cmdOrMultiprocessingGroup, str): if isinstance(workflow_item, str):
project_run( project_run(
project_dir, project_dir,
cmdOrMultiprocessingGroup, workflow_item,
overrides=overrides, overrides=overrides,
force=force, force=force,
dry=dry, dry=dry,
@ -90,6 +91,8 @@ def project_run(
mult_group_mutex=mult_group_mutex, mult_group_mutex=mult_group_mutex,
) )
else: else:
assert isinstance(workflow_item, list)
assert isinstance(workflow_item[0], str)
processes = [ processes = [
Process( Process(
target=project_run, target=project_run,
@ -102,7 +105,7 @@ def project_run(
"mult_group_mutex": mult_group_mutex, "mult_group_mutex": mult_group_mutex,
}, },
) )
for cmd in cmdOrMultiprocessingGroup for cmd in workflow_item
] ]
for process in processes: for process in processes:
process.start() process.start()
@ -133,18 +136,6 @@ def project_run(
update_lockfile(current_dir, cmd, mult_group_mutex=mult_group_mutex) update_lockfile(current_dir, cmd, mult_group_mutex=mult_group_mutex)
def _get_workflow_steps(workflow_items: List[Union[str, List[str]]]) -> List[str]:
steps: List[str] = []
for workflow_item in workflow_items:
if isinstance(workflow_item, str):
steps.append(workflow_item)
else:
assert isinstance(workflow_item, list)
assert isinstance(workflow_item[0], str)
steps.extend(workflow_item)
return steps
def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
"""Simulate a CLI help prompt using the info available in the project.yml. """Simulate a CLI help prompt using the info available in the project.yml.
@ -166,7 +157,7 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
if help_text: if help_text:
print(f"\n{help_text}\n") print(f"\n{help_text}\n")
elif subcommand in workflows: elif subcommand in workflows:
steps = _get_workflow_steps(workflows[subcommand]) steps = get_workflow_steps(workflows[subcommand])
print(f"\nWorkflow consisting of {len(steps)} commands:") print(f"\nWorkflow consisting of {len(steps)} commands:")
steps_data = [ steps_data = [
(f"{i + 1}. {step}", commands[step].get("help", "")) (f"{i + 1}. {step}", commands[step].get("help", ""))
@ -189,7 +180,7 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
print(f"Usage: {COMMAND} project run [WORKFLOW] {project_loc}") print(f"Usage: {COMMAND} project run [WORKFLOW] {project_loc}")
msg.table( msg.table(
[ [
(name, " -> ".join(_get_workflow_steps(workflow_items))) (name, " -> ".join(get_workflow_steps(workflow_items)))
for name, workflow_items in workflows.items() for name, workflow_items in workflows.items()
] ]
) )

View File

@ -1455,8 +1455,10 @@ Auto-generate [Data Version Control](https://dvc.org) (DVC) config file. Calls
the hood to generate the `dvc.yaml`. A DVC project can only define one pipeline, the hood to generate the `dvc.yaml`. A DVC project can only define one pipeline,
so you need to specify one workflow defined in the so you need to specify one workflow defined in the
[`project.yml`](/usage/projects#project-yml). If no workflow is specified, the [`project.yml`](/usage/projects#project-yml). If no workflow is specified, the
first defined workflow is used. The DVC config will only be updated if the first defined workflow is used. Note that any multiprocessing groups in the spaCy
`project.yml` changed. For details, see the config file will be flattened out and defined for sequential execution in the DVC config file
as DVC does not support multiprocessing in the same way as spaCy. The DVC config will only be updated
if the `project.yml` changed. For details, see the
[DVC integration](/usage/projects#dvc) docs. [DVC integration](/usage/projects#dvc) docs.
<Infobox variant="warning"> <Infobox variant="warning">