mirror of
https://github.com/explosion/spaCy.git
synced 2025-01-14 19:46:26 +03:00
34146750d4
We don't want to break backwards compatibility too much but we also want to provide the best possible UX
248 lines
10 KiB
Python
248 lines
10 KiB
Python
from typing import Optional, List, Dict, Sequence, Any, Iterable
|
|
from pathlib import Path
|
|
from wasabi import msg
|
|
import sys
|
|
import srsly
|
|
|
|
from ...util import working_dir, run_command, split_command, is_cwd, join_command
|
|
from ...util import SimpleFrozenList
|
|
from .._util import PROJECT_FILE, PROJECT_LOCK, load_project_config, get_hash
|
|
from .._util import get_checksum, project_cli, Arg, Opt, COMMAND
|
|
|
|
|
|
@project_cli.command("run")
|
|
def project_run_cli(
|
|
# fmt: off
|
|
subcommand: str = Arg(None, help=f"Name of command defined in the {PROJECT_FILE}"),
|
|
project_dir: Path = Arg(Path.cwd(), help="Location of project directory. Defaults to current working directory.", exists=True, file_okay=False),
|
|
force: bool = Opt(False, "--force", "-F", help="Force re-running steps, even if nothing changed"),
|
|
dry: bool = Opt(False, "--dry", "-D", help="Perform a dry run and don't execute scripts"),
|
|
show_help: bool = Opt(False, "--help", help="Show help message and available subcommands")
|
|
# fmt: on
|
|
):
|
|
"""Run a named command or workflow defined in the project.yml. If a workflow
|
|
name is specified, all commands in the workflow are run, in order. If
|
|
commands define dependencies and/or outputs, they will only be re-run if
|
|
state has changed.
|
|
"""
|
|
if show_help or not subcommand:
|
|
print_run_help(project_dir, subcommand)
|
|
else:
|
|
project_run(project_dir, subcommand, force=force, dry=dry)
|
|
|
|
|
|
def project_run(
|
|
project_dir: Path, subcommand: str, *, force: bool = False, dry: bool = False
|
|
) -> None:
|
|
"""Run a named script defined in the project.yml. If the script is part
|
|
of the default pipeline (defined in the "run" section), DVC is used to
|
|
execute the command, so it can determine whether to rerun it. It then
|
|
calls into "exec" to execute it.
|
|
|
|
project_dir (Path): Path to project directory.
|
|
subcommand (str): Name of command to run.
|
|
force (bool): Force re-running, even if nothing changed.
|
|
dry (bool): Perform a dry run and don't execute commands.
|
|
"""
|
|
config = load_project_config(project_dir)
|
|
commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
|
|
workflows = config.get("workflows", {})
|
|
validate_subcommand(commands.keys(), workflows.keys(), subcommand)
|
|
if subcommand in workflows:
|
|
msg.info(f"Running workflow '{subcommand}'")
|
|
for cmd in workflows[subcommand]:
|
|
project_run(project_dir, cmd, force=force, dry=dry)
|
|
else:
|
|
cmd = commands[subcommand]
|
|
for dep in cmd.get("deps", []):
|
|
if not (project_dir / dep).exists():
|
|
err = f"Missing dependency specified by command '{subcommand}': {dep}"
|
|
err_kwargs = {"exits": 1} if not dry else {}
|
|
msg.fail(err, **err_kwargs)
|
|
with working_dir(project_dir) as current_dir:
|
|
rerun = check_rerun(current_dir, cmd)
|
|
if not rerun and not force:
|
|
msg.info(f"Skipping '{cmd['name']}': nothing changed")
|
|
else:
|
|
msg.divider(subcommand)
|
|
run_commands(cmd["script"], dry=dry)
|
|
if not dry:
|
|
update_lockfile(current_dir, cmd)
|
|
|
|
|
|
def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
|
|
"""Simulate a CLI help prompt using the info available in the project.yml.
|
|
|
|
project_dir (Path): The project directory.
|
|
subcommand (Optional[str]): The subcommand or None. If a subcommand is
|
|
provided, the subcommand help is shown. Otherwise, the top-level help
|
|
and a list of available commands is printed.
|
|
"""
|
|
config = load_project_config(project_dir)
|
|
config_commands = config.get("commands", [])
|
|
commands = {cmd["name"]: cmd for cmd in config_commands}
|
|
workflows = config.get("workflows", {})
|
|
project_loc = "" if is_cwd(project_dir) else project_dir
|
|
if subcommand:
|
|
validate_subcommand(commands.keys(), workflows.keys(), subcommand)
|
|
print(f"Usage: {COMMAND} project run {subcommand} {project_loc}")
|
|
if subcommand in commands:
|
|
help_text = commands[subcommand].get("help")
|
|
if help_text:
|
|
print(f"\n{help_text}\n")
|
|
elif subcommand in workflows:
|
|
steps = workflows[subcommand]
|
|
print(f"\nWorkflow consisting of {len(steps)} commands:")
|
|
steps_data = [
|
|
(f"{i + 1}. {step}", commands[step].get("help", ""))
|
|
for i, step in enumerate(steps)
|
|
]
|
|
msg.table(steps_data)
|
|
help_cmd = f"{COMMAND} project run [COMMAND] {project_loc} --help"
|
|
print(f"For command details, run: {help_cmd}")
|
|
else:
|
|
print("")
|
|
title = config.get("title")
|
|
if title:
|
|
print(f"{title}\n")
|
|
if config_commands:
|
|
print(f"Available commands in {PROJECT_FILE}")
|
|
print(f"Usage: {COMMAND} project run [COMMAND] {project_loc}")
|
|
msg.table([(cmd["name"], cmd.get("help", "")) for cmd in config_commands])
|
|
if workflows:
|
|
print(f"Available workflows in {PROJECT_FILE}")
|
|
print(f"Usage: {COMMAND} project run [WORKFLOW] {project_loc}")
|
|
msg.table([(name, " -> ".join(steps)) for name, steps in workflows.items()])
|
|
|
|
|
|
def run_commands(
|
|
commands: Iterable[str] = SimpleFrozenList(),
|
|
silent: bool = False,
|
|
dry: bool = False,
|
|
) -> None:
|
|
"""Run a sequence of commands in a subprocess, in order.
|
|
|
|
commands (List[str]): The string commands.
|
|
silent (bool): Don't print the commands.
|
|
dry (bool): Perform a dry run and don't execut anything.
|
|
"""
|
|
for command in commands:
|
|
command = split_command(command)
|
|
# Not sure if this is needed or a good idea. Motivation: users may often
|
|
# use commands in their config that reference "python" and we want to
|
|
# make sure that it's always executing the same Python that spaCy is
|
|
# executed with and the pip in the same env, not some other Python/pip.
|
|
# Also ensures cross-compatibility if user 1 writes "python3" (because
|
|
# that's how it's set up on their system), and user 2 without the
|
|
# shortcut tries to re-run the command.
|
|
if len(command) and command[0] in ("python", "python3"):
|
|
command[0] = sys.executable
|
|
elif len(command) and command[0] in ("pip", "pip3"):
|
|
command = [sys.executable, "-m", "pip", *command[1:]]
|
|
if not silent:
|
|
print(f"Running command: {join_command(command)}")
|
|
if not dry:
|
|
run_command(command)
|
|
|
|
|
|
def validate_subcommand(
|
|
commands: Sequence[str], workflows: Sequence[str], subcommand: str
|
|
) -> None:
|
|
"""Check that a subcommand is valid and defined. Raises an error otherwise.
|
|
|
|
commands (Sequence[str]): The available commands.
|
|
subcommand (str): The subcommand.
|
|
"""
|
|
if not commands and not workflows:
|
|
msg.fail(f"No commands or workflows defined in {PROJECT_FILE}", exits=1)
|
|
if subcommand not in commands and subcommand not in workflows:
|
|
help_msg = []
|
|
if commands:
|
|
help_msg.append(f"Available commands: {', '.join(commands)}")
|
|
if workflows:
|
|
help_msg.append(f"Available workflows: {', '.join(workflows)}")
|
|
msg.fail(
|
|
f"Can't find command or workflow '{subcommand}' in {PROJECT_FILE}",
|
|
". ".join(help_msg),
|
|
exits=1,
|
|
)
|
|
|
|
|
|
def check_rerun(project_dir: Path, command: Dict[str, Any]) -> bool:
|
|
"""Check if a command should be rerun because its settings or inputs/outputs
|
|
changed.
|
|
|
|
project_dir (Path): The current project directory.
|
|
command (Dict[str, Any]): The command, as defined in the project.yml.
|
|
RETURNS (bool): Whether to re-run the command.
|
|
"""
|
|
lock_path = project_dir / PROJECT_LOCK
|
|
if not lock_path.exists(): # We don't have a lockfile, run command
|
|
return True
|
|
data = srsly.read_yaml(lock_path)
|
|
if command["name"] not in data: # We don't have info about this command
|
|
return True
|
|
entry = data[command["name"]]
|
|
# Always run commands with no outputs (otherwise they'd always be skipped)
|
|
if not entry.get("outs", []):
|
|
return True
|
|
# If the entry in the lockfile matches the lockfile entry that would be
|
|
# generated from the current command, we don't rerun because it means that
|
|
# all inputs/outputs, hashes and scripts are the same and nothing changed
|
|
return get_hash(get_lock_entry(project_dir, command)) != get_hash(entry)
|
|
|
|
|
|
def update_lockfile(project_dir: Path, command: Dict[str, Any]) -> None:
|
|
"""Update the lockfile after running a command. Will create a lockfile if
|
|
it doesn't yet exist and will add an entry for the current command, its
|
|
script and dependencies/outputs.
|
|
|
|
project_dir (Path): The current project directory.
|
|
command (Dict[str, Any]): The command, as defined in the project.yml.
|
|
"""
|
|
lock_path = project_dir / PROJECT_LOCK
|
|
if not lock_path.exists():
|
|
srsly.write_yaml(lock_path, {})
|
|
data = {}
|
|
else:
|
|
data = srsly.read_yaml(lock_path)
|
|
data[command["name"]] = get_lock_entry(project_dir, command)
|
|
srsly.write_yaml(lock_path, data)
|
|
|
|
|
|
def get_lock_entry(project_dir: Path, command: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get a lockfile entry for a given command. An entry includes the command,
|
|
the script (command steps) and a list of dependencies and outputs with
|
|
their paths and file hashes, if available. The format is based on the
|
|
dvc.lock files, to keep things consistent.
|
|
|
|
project_dir (Path): The current project directory.
|
|
command (Dict[str, Any]): The command, as defined in the project.yml.
|
|
RETURNS (Dict[str, Any]): The lockfile entry.
|
|
"""
|
|
deps = get_fileinfo(project_dir, command.get("deps", []))
|
|
outs = get_fileinfo(project_dir, command.get("outputs", []))
|
|
outs_nc = get_fileinfo(project_dir, command.get("outputs_no_cache", []))
|
|
return {
|
|
"cmd": f"{COMMAND} run {command['name']}",
|
|
"script": command["script"],
|
|
"deps": deps,
|
|
"outs": [*outs, *outs_nc],
|
|
}
|
|
|
|
|
|
def get_fileinfo(project_dir: Path, paths: List[str]) -> List[Dict[str, str]]:
|
|
"""Generate the file information for a list of paths (dependencies, outputs).
|
|
Includes the file path and the file's checksum.
|
|
|
|
project_dir (Path): The current project directory.
|
|
paths (List[str]): The file paths.
|
|
RETURNS (List[Dict[str, str]]): The lockfile entry for a file.
|
|
"""
|
|
data = []
|
|
for path in paths:
|
|
file_path = project_dir / path
|
|
md5 = get_checksum(file_path) if file_path.exists() else None
|
|
data.append({"path": path, "md5": md5})
|
|
return data
|