from typing import List, Dict, Any, Optional import typer import srsly from pathlib import Path from wasabi import msg import subprocess import shlex import os import re import shutil import sys import murmurhash import hashlib from ._app import app, Arg, Opt, COMMAND, NAME from .. import about from ..schemas import ProjectConfigSchema, validate from ..util import ensure_path, run_command, make_tempdir, working_dir CONFIG_FILE = "project.yml" DVC_CONFIG = "dvc.yaml" DIRS = [ "assets", "metas", "configs", "packages", "metrics", "scripts", "notebooks", "training", "corpus", ] CACHES = [ Path.home() / ".torch", Path.home() / ".caches" / "torch", os.environ.get("TORCH_HOME"), Path.home() / ".keras", ] DVC_CONFIG_COMMENT = """# This file is auto-generated by spaCy based on your project.yml. Do not edit # it directly and edit the project.yml instead and re-run the project.""" project_cli = typer.Typer(help="Command-line interface for spaCy projects") @project_cli.callback(invoke_without_command=True) def callback(ctx: typer.Context): """This runs before every project command and ensures DVC is installed and everything is up to date. """ try: subprocess.run(["dvc", "--version"], stdout=subprocess.DEVNULL) except Exception: msg.fail( "spaCy projects require DVC (Data Version Control) and the 'dvc' command", "You can install the Python package from pip (pip install dvc) or " "conda (conda install -c conda-forge dvc). For more details, see the " "documentation: https://dvc.org/doc/install", exits=1, ) @project_cli.command("clone") def project_clone_cli( # fmt: off name: str = Arg(..., help="The name of the template to fetch"), dest: Path = Arg(Path.cwd(), help="Where to download and work. Defaults to current working directory.", exists=False), repo: str = Opt(about.__projects__, "--repo", "-r", help="The repository to look in."), git: bool = Opt(False, "--git", "-G", help="Initialize project as a Git repo"), no_init: bool = Opt(False, "--no-init", "-NI", help="Don't initialize the project with DVC"), verbose: bool = Opt(False, "--verbose", "-V", help="Show detailed information") # fmt: on ): """Clone a project template from a repository.""" project_clone( name, dest, repo=repo, git=git, no_init=no_init, verbose=verbose, silent=True ) def project_clone( name: str, dest: Path, *, repo: str = about.__projects__, git: bool = False, no_init: bool = False, silent: bool = False, verbose: bool = False, ) -> None: dest = ensure_path(dest) check_clone_dest(dest) # When cloning a subdirectory with DVC, it will create a folder of that name # within the destination dir, so we use a tempdir and then copy it into the # parent directory to create the cloned directory dest = dest.resolve() with make_tempdir() as tmp_dir: cmd = ["dvc", "get", repo, name, "-o", str(tmp_dir)] if verbose: cmd.append("--verbose") if silent: cmd.append("--quiet") print(" ".join(cmd)) run_command(cmd) shutil.move(str(tmp_dir / Path(name).name), str(dest)) msg.good(f"Cloned project '{name}' from {repo}") for sub_dir in DIRS: dir_path = dest / sub_dir if not dir_path.exists(): dir_path.mkdir(parents=True) if not no_init: project_init(dest, git=git, silent=silent) msg.good(f"Your project is now ready!", dest) print(f"To fetch the assets, run:\npython -m {NAME} project assets {dest}") @project_cli.command("init") def project_init_cli( path: Path = Arg(..., help="Path to cloned project", exists=True, file_okay=False), git: bool = Opt(False, "--git", "-G", help="Initialize project as a Git repo"), ): """Initialize a project directory with DVC and Git (optional). This should typically be taken care of automatically when you run the "project clone" command. """ project_init(path, git=git, silent=True) def project_init( dest: Path, *, git: bool = False, silent: bool = False, analytics: bool = False ): with working_dir(dest): # TODO: check that .dvc exists in other commands? init_cmd = ["dvc", "init"] if silent: init_cmd.append("--quiet") if not git: init_cmd.append("--no-scm") if git: run_command(["git", "init"]) run_command(init_cmd) if not analytics: # TODO: find a better solution for this? run_command(["dvc", "config", "core.analytics", "false"]) config = load_project_config(dest) with msg.loading("Updating DVC config..."): updated = update_dvc_config(dest, config, silent=True) if updated: msg.good(f"Updated DVC config from {CONFIG_FILE}") @project_cli.command("assets") def project_assets_cli( # fmt: off path: Path = Arg(..., help="Path to cloned project", exists=True, file_okay=False), # fmt: on ): """Use Data Version Control to get the assets for the project.""" project_assets(path) def project_assets(project_path: Path) -> None: project_path = ensure_path(project_path) config = load_project_config(project_path) with msg.loading("Updating DVC config..."): updated = update_dvc_config(project_path, config, silent=True) if updated: msg.good(f"Updated DVC config from changed {CONFIG_FILE}") assets = config.get("assets", {}) if not assets: msg.warn(f"No assets specified in {CONFIG_FILE}", exits=0) msg.info(f"Fetching {len(assets)} asset(s)") variables = config.get("variables", {}) for asset in assets: url = asset["url"].format(**variables) dest = asset["dest"].format(**variables) fetch_asset(project_path, url, dest, asset.get("checksum")) def fetch_asset( project_path: Path, url: str, dest: Path, checksum: Optional[str] = None ) -> None: check_asset(url) dest_path = project_path / dest if dest_path.exists() and checksum: # If there's already a file, check for checksum # TODO: add support for chaches if checksum == get_checksum(dest_path): msg.good(f"Skipping download with matching checksum: {dest}") return with working_dir(project_path): try: dvc_cmd = ["dvc", "get-url", url, str(dest_path)] # If this fails, we don't want to output an error or info message out = subprocess.check_output(dvc_cmd, stderr=subprocess.DEVNULL) print(out) except subprocess.CalledProcessError: # TODO: Can we read out Weak ETags error? # TODO: replace curl run_command(["curl", url, "--output", str(dest_path)]) run_command(["dvc", "add", str(dest_path)]) msg.good(f"Fetched asset {dest}") @project_cli.command( "run-all", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}, ) def project_run_all_cli( # fmt: off ctx: typer.Context, project_dir: Path = Arg(..., help="Location of project directory", exists=True, file_okay=False), show_help: bool = Opt(False, "--help", help="Show help message and available subcommands") # fmt: on ): """Run all commands. Additional arguments are passed to dvc repro.""" if show_help: print_run_help(project_dir) else: project_run_all(project_dir, *ctx.args) def project_run_all(project_dir: Path, *dvc_args) -> None: config = load_project_config(project_dir) with msg.loading("Updating DVC config..."): updated = update_dvc_config(project_dir, config, silent=True) if updated: msg.good(f"Updated DVC config from changed {CONFIG_FILE}") dvc_cmd = ["dvc", "repro", *dvc_args] with working_dir(project_dir): run_command(dvc_cmd) @project_cli.command( "run", context_settings={"allow_extra_args": True, "ignore_unknown_options": True}, ) def project_run_cli( # fmt: off ctx: typer.Context, project_dir: Path = Arg(..., help="Location of project directory", exists=True, file_okay=False), subcommand: str = Arg(None, help="Name of command defined in project config"), show_help: bool = Opt(False, "--help", help="Show help message and available subcommands") # fmt: on ): """Run scripts defined in the project.""" if show_help or not subcommand: print_run_help(project_dir, subcommand) else: project_run(project_dir, subcommand, *ctx.args) def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: """Simulate a CLI help prompt using the info available in the project config.""" config = load_project_config(project_dir) config_commands = config.get("commands", []) commands = {cmd["name"]: cmd for cmd in config_commands} if subcommand: if subcommand not in commands: msg.fail(f"Can't find command '{subcommand}' in project config", exits=1) print(f"Usage: {COMMAND} project run {project_dir} {subcommand}") help_text = commands[subcommand].get("help") if help_text: msg.text(f"\n{help_text}\n") else: print(f"\nAvailable commands in {CONFIG_FILE}") print(f"Usage: {COMMAND} project run {project_dir} [COMMAND]") msg.table([(cmd["name"], cmd.get("help", "")) for cmd in config_commands]) def project_run(project_dir: Path, subcommand: str, *dvc_args) -> None: config = load_project_config(project_dir) with msg.loading("Updating DVC config..."): updated = update_dvc_config(project_dir, config, silent=True) if updated: msg.good(f"Updated DVC config from changed {CONFIG_FILE}") config_commands = config.get("commands", []) variables = config.get("variables", {}) commands = {cmd["name"]: cmd for cmd in config_commands} if subcommand not in commands: msg.fail(f"Can't find command '{subcommand}' in project config", exits=1) if subcommand in config.get("run", []): # This is one of the pipeline commands tracked in DVC dvc_cmd = ["dvc", "repro", subcommand, *dvc_args] run_command(dvc_cmd) else: cmd = commands[subcommand] # Deps in non-DVC commands aren't tracked, but if they're defined, # make sure they exist before running the command for dep in cmd.get("deps", []): if not (project_dir / dep).exists(): err = f"Missing dependency specified by command '{subcommand}': {dep}" msg.fail(err, exits=1) with working_dir(project_dir): run_commands(cmd["script"], variables) @project_cli.command("exec") def project_exec_cli( # fmt: off project_dir: Path = Arg(..., help="Location of project directory", exists=True, file_okay=False), subcommand: str = Arg(..., help="Name of command defined in project config"), # fmt: on ): """Internals""" project_exec(project_dir, subcommand) def project_exec(project_dir: Path, subcommand: str): config = load_project_config(project_dir) config_commands = config.get("commands", []) variables = config.get("variables", {}) commands = {cmd["name"]: cmd for cmd in config_commands} with working_dir(project_dir): run_commands(commands[subcommand]["script"], variables) @project_cli.command("update-dvc") def project_update_dvc_cli( # fmt: off project_dir: Path = Arg(..., help="Location of project directory", exists=True, file_okay=False), verbose: bool = Opt(False, "--verbose", "-V", help="Print more info"), force: bool = Opt(False, "--force", "-F", help="Force update DVC config"), # fmt: on ): config = load_project_config(project_dir) updated = update_dvc_config(project_dir, config, verbose=verbose, force=force) if updated: msg.good(f"Updated DVC config from {CONFIG_FILE}") else: msg.info(f"No changes found in {CONFIG_FILE}, no update needed") app.add_typer(project_cli, name="project") def load_project_config(path: Path) -> Dict[str, Any]: config_path = path / CONFIG_FILE if not config_path.exists(): msg.fail("Can't find project config", config_path, exits=1) config = srsly.read_yaml(config_path) errors = validate(ProjectConfigSchema, config) if errors: msg.fail(f"Invalid project config in {CONFIG_FILE}", "\n".join(errors), exits=1) return config def update_dvc_config( path: Path, config: Dict[str, Any], verbose: bool = False, silent: bool = False, force: bool = False, ) -> bool: """Re-run the DVC commands in dry mode and update dvc.yml file in the project directory. The file is auto-generated based on the config. """ config_hash = get_hash(config) dvc_config_path = path / DVC_CONFIG if dvc_config_path.exists(): # Cneck if the file was generated using the current config, if not, redo with dvc_config_path.open("r", encoding="utf8") as f: ref_hash = f.readline().strip().replace("# ", "") if ref_hash == config_hash and not force: return False # Nothing has changed in project config, don't need to update dvc_config_path.unlink() variables = config.get("variables", {}) commands = [] # We only want to include commands that are part of the main list of "run" # commands in project.yml and should be run in sequence config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])} for name in config.get("run", []): if name not in config_commands: msg.fail(f"Can't find command '{name}' in project config", exits=1) command = config_commands[name] deps = command.get("deps", []) outputs = command.get("outputs", []) outputs_no_cache = command.get("outputs_no_cache", []) if not deps and not outputs and not outputs_no_cache: continue # Default to "." as the project path since dvc.yaml is auto-generated # and we don't want arbitrary paths in there project_cmd = ["python", "-m", NAME, "project", "exec", ".", name] deps_cmd = [c for cl in [["-d", p] for p in deps] for c in cl] outputs_cmd = [c for cl in [["-o", p] for p in outputs] for c in cl] outputs_nc_cmd = [c for cl in [["-O", p] for p in outputs_no_cache] for c in cl] dvc_cmd = ["dvc", "run", "-n", name, "-w", str(path), "--no-exec"] if verbose: dvc_cmd.append("--verbose") if silent: dvc_cmd.append("--quiet") full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd] commands.append(" ".join(full_cmd)) with working_dir(path): run_commands(commands, variables, silent=True) with dvc_config_path.open("r+", encoding="utf8") as f: content = f.read() f.seek(0, 0) f.write(f"# {config_hash}\n{DVC_CONFIG_COMMENT}\n{content}") return True def run_commands( commands: List[str] = tuple(), variables: Dict[str, str] = {}, silent: bool = False ) -> None: for command in commands: # Substitute variables, e.g. "./{NAME}.json" command = command.format(**variables) command = shlex.split(command) # TODO: is this needed / a good idea? if len(command) and command[0] == "python": command[0] = sys.executable if not silent: print(" ".join(command)) run_command(command) def check_asset(url: str) -> None: # If the asset URL is a regular GitHub URL it's likely a mistake # TODO: support loading from GitHub URLs? Automatically convert to raw? if re.match("(http(s?)):\/\/github.com", url): msg.warn( "Downloading from a regular GitHub URL. This will only download " "the source of the page, not the actual file. If you want to " "download the raw file, click on 'Download' on the GitHub page " "and copy the raw.githubusercontent.com URL instead." ) # url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/").replace("/tree/", "/") def check_clone_dest(dest: Path) -> None: """Check and validate that the destination path can be used to clone.""" if not dest: msg.fail(f"Not a valid directory to clone project: {dest}", exits=1) if dest.exists(): # Directory already exists (not allowed, clone needs to create it) msg.fail(f"Can't clone project, directory already exists: {dest}", exits=1) if not dest.parent.exists(): # We're not creating parents, parent dir should exist msg.fail( f"Can't clone project, parent directory doesn't exist: {dest.parent}", exits=1, ) def get_hash(data) -> str: return str(murmurhash.hash(srsly.json_dumps(data, sort_keys=True))) def get_checksum(path: Path) -> str: return hashlib.md5(path.read_bytes()).hexdigest()