diff --git a/spacy/cli/_util.py b/spacy/cli/_util.py index 91d9f2811..3a64e3211 100644 --- a/spacy/cli/_util.py +++ b/spacy/cli/_util.py @@ -157,6 +157,7 @@ def load_project_config( print("\n".join(errors)) sys.exit(1) validate_project_version(config) + validate_max_parallel_processes(config) validate_project_commands(config) # Make sure directories defined in config exist for subdir in config.get("directories", []): @@ -199,7 +200,7 @@ def substitute_project_variables( def validate_project_version(config: Dict[str, Any]) -> None: - """If the project defines a compatible spaCy version range, chec that it's + """If the project defines a compatible spaCy version range, check that it's compatible with the current version of spaCy. config (Dict[str, Any]): The loaded config. @@ -215,6 +216,21 @@ def validate_project_version(config: Dict[str, Any]) -> None: msg.fail(err, exits=1) +def validate_max_parallel_processes(config: Dict[str, Any]) -> None: + """If the project defines a maximum number of parallel processes, check that the + value is within the permitted range. + + config (Dict[str, Any]): The loaded config. + """ + max_parallel_processes = config.get("max_parallel_processes", None) + if max_parallel_processes is not None and max_parallel_processes < 2: + err = ( + f"The {PROJECT_FILE} specifies a value for max_parallel_processes ({max_parallel_processes}) " + f"that is less than 2." + ) + msg.fail(err, exits=1) + + def verify_workflow_step(workflow_name: str, commands: List[str], step: str) -> None: if step not in commands: msg.fail( @@ -246,18 +262,20 @@ def validate_project_commands(config: Dict[str, Any]) -> None: if isinstance(workflow_item, str): verify_workflow_step(workflow_name, commands, workflow_item) else: - assert isinstance(workflow_item, list) - assert isinstance(workflow_item[0], str) - steps = cast(List[str], workflow_item) + assert isinstance(workflow_item, dict) + assert len(workflow_item) == 1 + steps_list = workflow_item["parallel"] + assert isinstance(steps_list[0], str) + steps = cast(List[str], steps_list) if len(steps) < 2: msg.fail( - f"Invalid multiprocessing group within '{workflow_name}'.", - f"A multiprocessing group must reference at least two commands.", + f"Invalid parallel group within '{workflow_name}'.", + f"A parallel group must reference at least two commands.", exits=1, ) if len(steps) != len(set(steps)): msg.fail( - f"A multiprocessing group within '{workflow_name}' contains a command more than once.", + f"A parallel group within '{workflow_name}' contains a command more than once.", f"This is not permitted because it is then not possible to determine when to rerun.", exits=1, ) @@ -580,15 +598,3 @@ def setup_gpu(use_gpu: int, silent=None) -> None: local_msg.info("Using CPU") if has_cupy and gpu_is_available(): local_msg.info("To switch to GPU 0, use the option: --gpu-id 0") - - -def get_workflow_steps(workflow_items: List[Union[str, List[str]]]) -> List[str]: - steps: List[str] = [] - for workflow_item in workflow_items: - if isinstance(workflow_item, str): - steps.append(workflow_item) - else: - assert isinstance(workflow_item, list) - assert isinstance(workflow_item[0], str) - steps.extend(workflow_item) - return steps diff --git a/spacy/cli/project/document.py b/spacy/cli/project/document.py index 7be52c841..b6a0ba20e 100644 --- a/spacy/cli/project/document.py +++ b/spacy/cli/project/document.py @@ -77,7 +77,9 @@ def project_document( rendered_steps.append(md.code(step)) else: rendered_steps.append( - "[" + ", ".join(md.code(p_step) for p_step in step) + "]" + "[" + + ", ".join(md.code(p_step) for p_step in step["parallel"]) + + "]" ) data.append([md.code(n), " → ".join(rendered_steps)]) if data: diff --git a/spacy/cli/project/dvc.py b/spacy/cli/project/dvc.py index 8796bb112..404cb189b 100644 --- a/spacy/cli/project/dvc.py +++ b/spacy/cli/project/dvc.py @@ -6,7 +6,7 @@ from pathlib import Path from wasabi import msg from .._util import PROJECT_FILE, load_project_config, get_hash, project_cli -from .._util import Arg, Opt, NAME, COMMAND, get_workflow_steps +from .._util import Arg, Opt, NAME, COMMAND from ...util import working_dir, split_command, join_command, run_command from ...util import SimpleFrozenList @@ -106,7 +106,12 @@ def update_dvc_config( dvc_commands = [] config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} processed_step = False - for name in get_workflow_steps(workflows[workflow]): + for name in workflows[workflow]: + if isinstance(name, dict) and "parallel" in name: + msg.fail( + f"A DVC workflow may not contain parallel groups", + exits=1, + ) command = config_commands[name] deps = command.get("deps", []) outputs = command.get("outputs", []) diff --git a/spacy/cli/project/run.py b/spacy/cli/project/run.py index 4d8a5a15a..0170a60b6 100644 --- a/spacy/cli/project/run.py +++ b/spacy/cli/project/run.py @@ -1,6 +1,6 @@ from typing import Optional, List, Dict, Sequence, Any, Iterable, Union, Tuple from pathlib import Path -from multiprocessing import Process, Lock +from multiprocessing import Process, Lock, Queue from multiprocessing.synchronize import Lock as Lock_t from wasabi import msg from wasabi.util import locale_escape @@ -15,7 +15,6 @@ from ...util import SimpleFrozenList, is_minor_version_match, ENV_VARS from ...util import check_bool_env_var, SimpleFrozenDict from .._util import PROJECT_FILE, PROJECT_LOCK, load_project_config, get_hash from .._util import get_checksum, project_cli, Arg, Opt, COMMAND, parse_config_overrides -from .._util import get_workflow_steps @project_cli.command( @@ -54,6 +53,7 @@ def project_run( dry: bool = False, capture: bool = False, mult_group_mutex: Optional[Lock_t] = None, + completion_queue: Optional[Queue] = None, ) -> None: """Run a named script defined in the project.yml. If the script is part of the default pipeline (defined in the "run" section), DVC is used to @@ -76,6 +76,7 @@ def project_run( config = load_project_config(project_dir, overrides=overrides) commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} workflows = config.get("workflows", {}) + max_parallel_processes = config.get("max_parallel_processes") validate_subcommand(list(commands.keys()), list(workflows.keys()), subcommand) if subcommand in workflows: msg.info(f"Running workflow '{subcommand}'") @@ -91,8 +92,11 @@ def project_run( mult_group_mutex=mult_group_mutex, ) else: - assert isinstance(workflow_item, list) - assert isinstance(workflow_item[0], str) + assert isinstance(workflow_item, dict) + assert len(workflow_item) == 1 + steps_list = workflow_item["parallel"] + assert isinstance(steps_list[0], str) + completion_queue = Queue(len(steps_list)) processes = [ Process( target=project_run, @@ -103,14 +107,26 @@ def project_run( "dry": dry, "capture": capture, "mult_group_mutex": mult_group_mutex, + "completion_queue": completion_queue, }, ) - for cmd in workflow_item + for cmd in steps_list ] - for process in processes: - process.start() - for process in processes: - process.join() + num_processes = len(processes) + if ( + max_parallel_processes is not None + and max_parallel_processes < num_processes + ): + num_processes = max_parallel_processes + process_iterator = iter(processes) + for _ in range(num_processes): + next(process_iterator).start() + for _ in range(len(steps_list)): + completion_queue.get() + next_process = next(process_iterator, None) + if next_process is not None: + next_process.start() + else: cmd = commands[subcommand] for dep in cmd.get("deps", []): @@ -134,6 +150,8 @@ def project_run( run_commands(cmd["script"], dry=dry, capture=capture) if not dry: update_lockfile(current_dir, cmd, mult_group_mutex=mult_group_mutex) + if completion_queue is not None: + completion_queue.put(None) def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: @@ -157,12 +175,36 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: if help_text: print(f"\n{help_text}\n") elif subcommand in workflows: - steps = get_workflow_steps(workflows[subcommand]) + steps: List[Tuple[str, str]] = [] + contains_parallel = False + for workflow_item in workflows[subcommand]: + if isinstance(workflow_item, str): + steps.append((" ", workflow_item)) + else: + contains_parallel = True + assert isinstance(workflow_item, dict) + assert len(workflow_item) == 1 + steps_list = workflow_item["parallel"] + assert isinstance(steps_list[0], str) + for i, step in enumerate(steps_list): + if i == 0: + parallel_char = "╔" + elif i + 1 == len(steps_list): + parallel_char = "╚" + else: + parallel_char = "║" + steps.append((parallel_char, step)) print(f"\nWorkflow consisting of {len(steps)} commands:") - steps_data = [ - (f"{i + 1}. {step}", commands[step].get("help", "")) - for i, step in enumerate(steps) - ] + if contains_parallel: + steps_data = [ + (f"{i + 1}. {step[0]} {step[1]}", commands[step[1]].get("help", "")) + for i, step in enumerate(steps) + ] + else: + steps_data = [ + (f"{i + 1}. {step[1]}", commands[step[1]].get("help", "")) + for i, step in enumerate(steps) + ] msg.table(steps_data) help_cmd = f"{COMMAND} project run [COMMAND] {project_loc} --help" print(f"For command details, run: {help_cmd}") @@ -180,9 +222,16 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: print(f"Usage: {COMMAND} project run [WORKFLOW] {project_loc}") table_entries: List[Tuple[str, str]] = [] for name, workflow_items in workflows.items(): - table_entries.append( - (name, " -> ".join(get_workflow_steps(workflow_items))) - ) + descriptions: List[str] = [] + for workflow_item in workflow_items: + if isinstance(workflow_item, str): + descriptions.append(workflow_item) + else: + assert isinstance(workflow_item, dict) + assert len(workflow_item) == 1 + steps_list = workflow_item["parallel"] + descriptions.append("parallel[" + ", ".join(steps_list) + "]") + table_entries.append((name, " -> ".join(descriptions))) msg.table(table_entries) diff --git a/spacy/schemas.py b/spacy/schemas.py index 5a9391640..98c611fee 100644 --- a/spacy/schemas.py +++ b/spacy/schemas.py @@ -458,10 +458,11 @@ class ProjectConfigSchema(BaseModel): vars: Dict[StrictStr, Any] = Field({}, title="Optional variables to substitute in commands") env: Dict[StrictStr, Any] = Field({}, title="Optional variable names to substitute in commands, mapped to environment variable names") assets: List[Union[ProjectConfigAssetURL, ProjectConfigAssetGit]] = Field([], title="Data assets") - workflows: Dict[StrictStr, List[Union[StrictStr, List[StrictStr]]]] = Field({}, title="Named workflows, mapped to list of project commands to run in order") + workflows: Dict[StrictStr, List[Union[StrictStr, Dict[Literal["parallel"], List[StrictStr]]]]] = Field({}, title="Named workflows, mapped to list of project commands to run in order") commands: List[ProjectConfigCommand] = Field([], title="Project command shortcuts") title: Optional[str] = Field(None, title="Project title") spacy_version: Optional[StrictStr] = Field(None, title="spaCy version range that the project is compatible with") + max_parallel_processes: Optional[int] = Field(None, title="Maximum number of permitted parallel processes") # fmt: on class Config: diff --git a/spacy/tests/test_cli.py b/spacy/tests/test_cli.py index e10486245..2e2ce3a2d 100644 --- a/spacy/tests/test_cli.py +++ b/spacy/tests/test_cli.py @@ -429,7 +429,7 @@ def test_project_config_multiprocessing_good_case(): {"name": "command2", "script": ["echo", "command2"]}, {"name": "command3", "script": ["echo", "command3"]}, ], - "workflows": {"all": ["command1", ["command2", "command3"]]}, + "workflows": {"all": ["command1", {"parallel": ["command2", "command3"]}]}, } with make_tempdir() as d: srsly.write_yaml(d / "project.yml", project) @@ -439,12 +439,13 @@ def test_project_config_multiprocessing_good_case(): @pytest.mark.parametrize( "workflows", [ - {"all": ["command1", ["command2"], "command3"]}, - {"all": ["command1", ["command2", "command4"]]}, - {"all": ["command1", ["command2", "command2"]]}, + {"all": ["command1", {"parallel": ["command2"]}, "command3"]}, + {"all": ["command1", {"parallel": ["command2", "command4"]}]}, + {"all": ["command1", {"parallel": ["command2", "command2"]}]}, + {"all": ["command1", {"serial": ["command2", "command3"]}]}, ], ) -def test_project_config_multiprocessing_bad_case(workflows): +def test_project_config_multiprocessing_bad_case_workflows(workflows): project = { "commands": [ {"name": "command1", "script": ["echo", "command1"]}, @@ -459,6 +460,33 @@ def test_project_config_multiprocessing_bad_case(workflows): load_project_config(d) +@pytest.mark.parametrize("max_parallel_processes", [-1, 0, 1]) +def test_project_config_multiprocessing_max_processes_bad_case(max_parallel_processes): + with make_tempdir() as d: + project = { + "max_parallel_processes": max_parallel_processes, + "commands": [ + { + "name": "commandA", + "script": [" ".join(("touch", os.sep.join((str(d), "A"))))], + }, + { + "name": "commandB", + "script": [" ".join(("touch", os.sep.join((str(d), "B"))))], + }, + { + "name": "commandC", + "script": [" ".join(("touch", os.sep.join((str(d), "C"))))], + }, + ], + "workflows": {"all": [{"parallel": ["commandA", "commandB", "commandC"]}]}, + } + with make_tempdir() as d: + srsly.write_yaml(d / "project.yml", project) + with pytest.raises(SystemExit): + load_project_config(d) + + def test_project_run_multiprocessing_good_case(): with make_tempdir() as d: @@ -502,7 +530,12 @@ else: # should never happen because of skipping "outputs": [os.sep.join((str(d), "e"))], }, ], - "workflows": {"all": [["commandA", "commandB"], ["commandA", "commandB"]]}, + "workflows": { + "all": [ + {"parallel": ["commandA", "commandB"]}, + {"parallel": ["commandB", "commandA"]}, + ] + }, } srsly.write_yaml(d / "project.yml", project) load_project_config(d) @@ -513,6 +546,36 @@ else: # should never happen because of skipping assert not os.path.exists(os.sep.join((str(d), "f"))) +@pytest.mark.parametrize("max_parallel_processes", [2, 3, 4]) +def test_project_run_multiprocessing_max_processes_good_case(max_parallel_processes): + with make_tempdir() as d: + + project = { + "max_parallel_processes": max_parallel_processes, + "commands": [ + { + "name": "commandA", + "script": [" ".join(("touch", os.sep.join((str(d), "A"))))], + }, + { + "name": "commandB", + "script": [" ".join(("touch", os.sep.join((str(d), "B"))))], + }, + { + "name": "commandC", + "script": [" ".join(("touch", os.sep.join((str(d), "C"))))], + }, + ], + "workflows": {"all": [{"parallel": ["commandA", "commandB", "commandC"]}]}, + } + srsly.write_yaml(d / "project.yml", project) + load_project_config(d) + project_run(d, "all") + assert os.path.exists(os.sep.join((str(d), "A"))) + assert os.path.exists(os.sep.join((str(d), "B"))) + assert os.path.exists(os.sep.join((str(d), "C"))) + + @pytest.mark.parametrize( "greeting", [342, "everyone", "tout le monde", pytest.param("42", marks=pytest.mark.xfail)], diff --git a/website/docs/api/cli.md b/website/docs/api/cli.md index 87e8e8a74..a5126948c 100644 --- a/website/docs/api/cli.md +++ b/website/docs/api/cli.md @@ -1455,9 +1455,8 @@ Auto-generate [Data Version Control](https://dvc.org) (DVC) config file. Calls the hood to generate the `dvc.yaml`. A DVC project can only define one pipeline, so you need to specify one workflow defined in the [`project.yml`](/usage/projects#project-yml). If no workflow is specified, the -first defined workflow is used. Note that any multiprocessing groups in the spaCy -config file will be flattened out and defined for sequential execution in the DVC config file -as DVC does not support multiprocessing in the same way as spaCy. The DVC config will only be updated +first defined workflow is used. Note that the spaCy config file may not contain parallel groups, +as DVC does not support parallel execution in the same way as spaCy. The DVC config will only be updated if the `project.yml` changed. For details, see the [DVC integration](/usage/projects#dvc) docs. diff --git a/website/docs/usage/projects.md b/website/docs/usage/projects.md index 839d589b0..4c40f04fd 100644 --- a/website/docs/usage/projects.md +++ b/website/docs/usage/projects.md @@ -153,9 +153,7 @@ script). > all: > - preprocess > - train -> - -> - multiprocessingGroupCommand1 -> - multiprocessingGroupCommand2 +> - parallel: [parallelCommand1, parallelCommand2] > - package > ``` @@ -172,8 +170,8 @@ $ python -m spacy project run all ``` Sometimes it makes sense to execute two or more commands in parallel. A group -of commands executed at once is known as a multiprocessing group; a multiprocessing group -is defined by indenting the commands it contains. You are responsible for making sure that no +of commands executed in parallel is defined using the `parallel` keyword mapping to +the commands specified as a list. You are responsible for making sure that no deadlocks, race conditions or other issues can arise from the parallel execution. Using the expected [dependencies and outputs](#deps-outputs) defined in the @@ -239,7 +237,7 @@ pipelines. | `env` | A dictionary of variables, mapped to the names of environment variables that will be read in when running the project. For example, `${env.name}` will use the value of the environment variable defined as `name`. | | `directories` | An optional list of [directories](#project-files) that should be created in the project for assets, training outputs, metrics etc. spaCy will make sure that these directories always exist. | | `assets` | A list of assets that can be fetched with the [`project assets`](/api/cli#project-assets) command. `url` defines a URL or local path, `dest` is the destination file relative to the project directory, and an optional `checksum` ensures that an error is raised if the file's checksum doesn't match. Instead of `url`, you can also provide a `git` block with the keys `repo`, `branch` and `path`, to download from a Git repo. | -| `workflows` | A dictionary of workflow names, mapped to a list of command names, to execute in order. Nested lists represent groups of commands to execute concurrently. Workflows can be run with the [`project run`](/api/cli#project-run) command. | +| `workflows` | A dictionary of workflow names, mapped to a list of command names, to execute in order. The `parallel` keyword mapping to a list of command names specifies parallel execution. Workflows can be run with the [`project run`](/api/cli#project-run) command. | | `commands` | A list of named commands. A command can define an optional help message (shown in the CLI when the user adds `--help`) and the `script`, a list of commands to run. The `deps` and `outputs` let you define the created file the command depends on and produces, respectively. This lets spaCy determine whether a command needs to be re-run because its dependencies or outputs changed. Commands can be run as part of a workflow, or separately with the [`project run`](/api/cli#project-run) command. | | `spacy_version` | Optional spaCy version range like `>=3.0.0,<3.1.0` that the project is compatible with. If it's loaded with an incompatible version, an error is raised when the project is loaded. |