From 4daffdd65392194c231f2270ec387b4a17cdfc69 Mon Sep 17 00:00:00 2001 From: richardpaulhudson Date: Mon, 23 May 2022 10:03:59 +0200 Subject: [PATCH] Changes based on review --- spacy/cli/_util.py | 24 ++++++++++++++++++------ spacy/cli/project/dvc.py | 19 +++++++++---------- spacy/cli/project/run.py | 27 +++++++++------------------ website/docs/api/cli.md | 6 ++++-- 4 files changed, 40 insertions(+), 36 deletions(-) diff --git a/spacy/cli/_util.py b/spacy/cli/_util.py index b0e6dd4d6..91d9f2811 100644 --- a/spacy/cli/_util.py +++ b/spacy/cli/_util.py @@ -242,13 +242,13 @@ def validate_project_commands(config: Dict[str, Any]) -> None: if workflow_name in commands: err = f"Can't use workflow name '{workflow_name}': name already exists as a command" msg.fail(err, exits=1) - for step_or_list in workflow_items: - if isinstance(step_or_list, str): - verify_workflow_step(workflow_name, commands, step_or_list) + for workflow_item in workflow_items: + if isinstance(workflow_item, str): + verify_workflow_step(workflow_name, commands, workflow_item) else: - assert isinstance(step_or_list, list) - assert isinstance(step_or_list[0], str) - steps = cast(List[str], step_or_list) + assert isinstance(workflow_item, list) + assert isinstance(workflow_item[0], str) + steps = cast(List[str], workflow_item) if len(steps) < 2: msg.fail( f"Invalid multiprocessing group within '{workflow_name}'.", @@ -580,3 +580,15 @@ def setup_gpu(use_gpu: int, silent=None) -> None: local_msg.info("Using CPU") if has_cupy and gpu_is_available(): local_msg.info("To switch to GPU 0, use the option: --gpu-id 0") + + +def get_workflow_steps(workflow_items: List[Union[str, List[str]]]) -> List[str]: + steps: List[str] = [] + for workflow_item in workflow_items: + if isinstance(workflow_item, str): + steps.append(workflow_item) + else: + assert isinstance(workflow_item, list) + assert isinstance(workflow_item[0], str) + steps.extend(workflow_item) + return steps diff --git a/spacy/cli/project/dvc.py b/spacy/cli/project/dvc.py index 3069b01ea..8796bb112 100644 --- a/spacy/cli/project/dvc.py +++ b/spacy/cli/project/dvc.py @@ -6,7 +6,7 @@ from pathlib import Path from wasabi import msg from .._util import PROJECT_FILE, load_project_config, get_hash, project_cli -from .._util import Arg, Opt, NAME, COMMAND +from .._util import Arg, Opt, NAME, COMMAND, get_workflow_steps from ...util import working_dir, split_command, join_command, run_command from ...util import SimpleFrozenList @@ -105,21 +105,15 @@ def update_dvc_config( dvc_config_path.unlink() dvc_commands = [] config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} - names = [] - for cmdOrMultiprocessingGroup in workflows[workflow]: - if isinstance(cmdOrMultiprocessingGroup, str): - names.append(cmdOrMultiprocessingGroup) - else: - assert isinstance(cmdOrMultiprocessingGroup, list) - assert isinstance(cmdOrMultiprocessingGroup[0], str) - names.extend(cmdOrMultiprocessingGroup) - for name in names: + processed_step = False + for name in get_workflow_steps(workflows[workflow]): command = config_commands[name] deps = command.get("deps", []) outputs = command.get("outputs", []) outputs_no_cache = command.get("outputs_no_cache", []) if not deps and not outputs and not outputs_no_cache: continue + processed_step = True # Default to the working dir as the project path since dvc.yaml is auto-generated # and we don't want arbitrary paths in there project_cmd = ["python", "-m", NAME, "project", "run", name] @@ -131,6 +125,11 @@ def update_dvc_config( dvc_cmd.append("--always-changed") full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd] dvc_commands.append(join_command(full_cmd)) + if not processed_step: + msg.fail( + f"A DVC workflow must have at least one dependency or output", + exits=1, + ) with working_dir(path): dvc_flags = {"--verbose": verbose, "--quiet": silent} run_dvc_commands(dvc_commands, flags=dvc_flags) diff --git a/spacy/cli/project/run.py b/spacy/cli/project/run.py index 8486a002e..941b9fa9d 100644 --- a/spacy/cli/project/run.py +++ b/spacy/cli/project/run.py @@ -15,6 +15,7 @@ from ...util import SimpleFrozenList, is_minor_version_match, ENV_VARS from ...util import check_bool_env_var, SimpleFrozenDict from .._util import PROJECT_FILE, PROJECT_LOCK, load_project_config, get_hash from .._util import get_checksum, project_cli, Arg, Opt, COMMAND, parse_config_overrides +from .._util import get_workflow_steps @project_cli.command( @@ -78,11 +79,11 @@ def project_run( validate_subcommand(list(commands.keys()), list(workflows.keys()), subcommand) if subcommand in workflows: msg.info(f"Running workflow '{subcommand}'") - for cmdOrMultiprocessingGroup in workflows[subcommand]: - if isinstance(cmdOrMultiprocessingGroup, str): + for workflow_item in workflows[subcommand]: + if isinstance(workflow_item, str): project_run( project_dir, - cmdOrMultiprocessingGroup, + workflow_item, overrides=overrides, force=force, dry=dry, @@ -90,6 +91,8 @@ def project_run( mult_group_mutex=mult_group_mutex, ) else: + assert isinstance(workflow_item, list) + assert isinstance(workflow_item[0], str) processes = [ Process( target=project_run, @@ -102,7 +105,7 @@ def project_run( "mult_group_mutex": mult_group_mutex, }, ) - for cmd in cmdOrMultiprocessingGroup + for cmd in workflow_item ] for process in processes: process.start() @@ -133,18 +136,6 @@ def project_run( update_lockfile(current_dir, cmd, mult_group_mutex=mult_group_mutex) -def _get_workflow_steps(workflow_items: List[Union[str, List[str]]]) -> List[str]: - steps: List[str] = [] - for workflow_item in workflow_items: - if isinstance(workflow_item, str): - steps.append(workflow_item) - else: - assert isinstance(workflow_item, list) - assert isinstance(workflow_item[0], str) - steps.extend(workflow_item) - return steps - - def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: """Simulate a CLI help prompt using the info available in the project.yml. @@ -166,7 +157,7 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: if help_text: print(f"\n{help_text}\n") elif subcommand in workflows: - steps = _get_workflow_steps(workflows[subcommand]) + steps = get_workflow_steps(workflows[subcommand]) print(f"\nWorkflow consisting of {len(steps)} commands:") steps_data = [ (f"{i + 1}. {step}", commands[step].get("help", "")) @@ -189,7 +180,7 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: print(f"Usage: {COMMAND} project run [WORKFLOW] {project_loc}") msg.table( [ - (name, " -> ".join(_get_workflow_steps(workflow_items))) + (name, " -> ".join(get_workflow_steps(workflow_items))) for name, workflow_items in workflows.items() ] ) diff --git a/website/docs/api/cli.md b/website/docs/api/cli.md index e801ff0a6..87e8e8a74 100644 --- a/website/docs/api/cli.md +++ b/website/docs/api/cli.md @@ -1455,8 +1455,10 @@ Auto-generate [Data Version Control](https://dvc.org) (DVC) config file. Calls the hood to generate the `dvc.yaml`. A DVC project can only define one pipeline, so you need to specify one workflow defined in the [`project.yml`](/usage/projects#project-yml). If no workflow is specified, the -first defined workflow is used. The DVC config will only be updated if the -`project.yml` changed. For details, see the +first defined workflow is used. Note that any multiprocessing groups in the spaCy +config file will be flattened out and defined for sequential execution in the DVC config file +as DVC does not support multiprocessing in the same way as spaCy. The DVC config will only be updated +if the `project.yml` changed. For details, see the [DVC integration](/usage/projects#dvc) docs.