Update project CLI hashes, directories, skipping (#5741)

* Update project CLI hashes, directories, skipping

* Improve clone success message

* Remove unused context args

* Move project-specific utils to project utils

The hashing/checksum functions may not end up being general-purpose functions and are more designed for the projects, so they shouldn't live in spacy.util

* Improve run help and add workflows

* Add note re: directory checksum speed

* Fix cloning from subdirectories and output messages

* Remove hard-coded dirs
This commit is contained in:
Ines Montani 2020-07-09 23:51:18 +02:00 committed by GitHub
parent e624fcd5d9
commit a60562f208
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 103 additions and 73 deletions

View File

@ -6,9 +6,9 @@ import tqdm
import re import re
import shutil import shutil
from ...util import ensure_path, get_checksum, working_dir from ...util import ensure_path, working_dir
from .._app import project_cli, Arg from .._app import project_cli, Arg
from .util import PROJECT_FILE, load_project_config from .util import PROJECT_FILE, load_project_config, get_checksum
# TODO: find a solution for caches # TODO: find a solution for caches
@ -94,6 +94,10 @@ def fetch_asset(
if checksum == get_checksum(dest_path): if checksum == get_checksum(dest_path):
msg.good(f"Skipping download with matching checksum: {dest}") msg.good(f"Skipping download with matching checksum: {dest}")
return dest_path return dest_path
# We might as well support the user here and create parent directories in
# case the asset dir isn't listed as a dir to create in the project.yml
if not dest_path.parent.exists():
dest_path.parent.mkdir(parents=True)
with working_dir(project_path): with working_dir(project_path):
url = convert_asset_url(url) url = convert_asset_url(url)
try: try:

View File

@ -3,23 +3,12 @@ from pathlib import Path
from wasabi import msg from wasabi import msg
import subprocess import subprocess
import shutil import shutil
import re
from ... import about from ... import about
from ...util import ensure_path, run_command, make_tempdir from ...util import ensure_path, run_command, make_tempdir
from .._app import project_cli, Arg, Opt, COMMAND from .._app import project_cli, Arg, Opt, COMMAND
from .util import PROJECT_FILE
DIRS = [
"assets",
"metas",
"configs",
"packages",
"metrics",
"scripts",
"notebooks",
"training",
"corpus",
]
@project_cli.command("clone") @project_cli.command("clone")
@ -50,6 +39,7 @@ def project_clone(name: str, dest: Path, *, repo: str = about.__projects__) -> N
dest = ensure_path(dest) dest = ensure_path(dest)
check_clone(name, dest, repo) check_clone(name, dest, repo)
project_dir = dest.resolve() project_dir = dest.resolve()
repo_name = re.sub(r"(http(s?)):\/\/github.com/", "", repo)
# We're using Git and sparse checkout to only clone the files we need # We're using Git and sparse checkout to only clone the files we need
with make_tempdir() as tmp_dir: with make_tempdir() as tmp_dir:
cmd = f"git clone {repo} {tmp_dir} --no-checkout --depth 1 --config core.sparseCheckout=true" cmd = f"git clone {repo} {tmp_dir} --no-checkout --depth 1 --config core.sparseCheckout=true"
@ -64,16 +54,16 @@ def project_clone(name: str, dest: Path, *, repo: str = about.__projects__) -> N
run_command(["git", "-C", str(tmp_dir), "fetch"]) run_command(["git", "-C", str(tmp_dir), "fetch"])
run_command(["git", "-C", str(tmp_dir), "checkout"]) run_command(["git", "-C", str(tmp_dir), "checkout"])
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
err = f"Could not clone '{name}' in the repo '{repo}'." err = f"Could not clone '{name}' from repo '{repo_name}'"
msg.fail(err) msg.fail(err)
shutil.move(str(tmp_dir / Path(name).name), str(project_dir)) # We need Path(name) to make sure we also support subdirectories
msg.good(f"Cloned project '{name}' from {repo} into {project_dir}") shutil.move(str(tmp_dir / Path(name)), str(project_dir))
for sub_dir in DIRS: msg.good(f"Cloned '{name}' from {repo_name}", project_dir)
dir_path = project_dir / sub_dir if not (project_dir / PROJECT_FILE).exists():
if not dir_path.exists(): msg.warn(f"No {PROJECT_FILE} found in directory")
dir_path.mkdir(parents=True) else:
msg.good(f"Your project is now ready!", dest) msg.good(f"Your project is now ready!")
print(f"To fetch the assets, run:\n{COMMAND} project assets {dest}") print(f"To fetch the assets, run:\n{COMMAND} project assets {dest}")
def check_clone(name: str, dest: Path, repo: str) -> None: def check_clone(name: str, dest: Path, repo: str) -> None:

View File

@ -5,9 +5,9 @@ import subprocess
from pathlib import Path from pathlib import Path
from wasabi import msg from wasabi import msg
from .util import PROJECT_FILE, load_project_config from .util import PROJECT_FILE, load_project_config, get_hash
from .._app import project_cli, Arg, Opt, NAME, COMMAND from .._app import project_cli, Arg, Opt, NAME, COMMAND
from ...util import get_hash, working_dir, split_command, join_command, run_command from ...util import working_dir, split_command, join_command, run_command
DVC_CONFIG = "dvc.yaml" DVC_CONFIG = "dvc.yaml"
@ -116,6 +116,8 @@ def update_dvc_config(
outputs_cmd = [c for cl in [["-o", p] for p in outputs] for c in cl] outputs_cmd = [c for cl in [["-o", p] for p in outputs] for c in cl]
outputs_nc_cmd = [c for cl in [["-O", p] for p in outputs_no_cache] for c in cl] outputs_nc_cmd = [c for cl in [["-O", p] for p in outputs_no_cache] for c in cl]
dvc_cmd = ["run", "-n", name, "-w", str(path), "--no-exec"] dvc_cmd = ["run", "-n", name, "-w", str(path), "--no-exec"]
if command.get("no_skip"):
dvc_cmd.append("--always-changed")
full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd] full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd]
dvc_commands.append(join_command(full_cmd)) dvc_commands.append(join_command(full_cmd))
with working_dir(path): with working_dir(path):

View File

@ -1,22 +1,18 @@
from typing import Optional, List, Dict, Sequence, Any from typing import Optional, List, Dict, Sequence, Any
from pathlib import Path from pathlib import Path
from wasabi import msg from wasabi import msg
import typer
import sys import sys
import srsly import srsly
from ...util import working_dir, run_command, split_command, is_cwd, get_checksum from ...util import working_dir, run_command, split_command, is_cwd, join_command
from ...util import get_hash, join_command
from .._app import project_cli, Arg, Opt, COMMAND from .._app import project_cli, Arg, Opt, COMMAND
from .util import PROJECT_FILE, PROJECT_LOCK, load_project_config from .util import PROJECT_FILE, PROJECT_LOCK, load_project_config, get_hash
from .util import get_checksum
@project_cli.command( @project_cli.command("run")
"run", context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
)
def project_run_cli( def project_run_cli(
# fmt: off # fmt: off
ctx: typer.Context,
subcommand: str = Arg(None, help=f"Name of command defined in the {PROJECT_FILE}"), subcommand: str = Arg(None, help=f"Name of command defined in the {PROJECT_FILE}"),
project_dir: Path = Arg(Path.cwd(), help="Location of project directory. Defaults to current working directory.", exists=True, file_okay=False), project_dir: Path = Arg(Path.cwd(), help="Location of project directory. Defaults to current working directory.", exists=True, file_okay=False),
force: bool = Opt(False, "--force", "-F", help="Force re-running steps, even if nothing changed"), force: bool = Opt(False, "--force", "-F", help="Force re-running steps, even if nothing changed"),
@ -32,7 +28,7 @@ def project_run_cli(
if show_help or not subcommand: if show_help or not subcommand:
print_run_help(project_dir, subcommand) print_run_help(project_dir, subcommand)
else: else:
project_run(project_dir, subcommand, *ctx.args, force=force, dry=dry) project_run(project_dir, subcommand, force=force, dry=dry)
def project_run( def project_run(
@ -73,7 +69,8 @@ def project_run(
else: else:
msg.divider(subcommand) msg.divider(subcommand)
run_commands(cmd["script"], variables, dry=dry) run_commands(cmd["script"], variables, dry=dry)
update_lockfile(current_dir, cmd, variables) if not dry:
update_lockfile(current_dir, cmd, variables)
def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None: def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
@ -87,19 +84,35 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
config = load_project_config(project_dir) config = load_project_config(project_dir)
config_commands = config.get("commands", []) config_commands = config.get("commands", [])
commands = {cmd["name"]: cmd for cmd in config_commands} commands = {cmd["name"]: cmd for cmd in config_commands}
workflows = config.get("workflows", {})
project_loc = "" if is_cwd(project_dir) else project_dir project_loc = "" if is_cwd(project_dir) else project_dir
if subcommand: if subcommand:
validate_subcommand(commands.keys(), subcommand) validate_subcommand(commands.keys(), workflows.keys(), subcommand)
print(f"Usage: {COMMAND} project run {subcommand} {project_loc}") print(f"Usage: {COMMAND} project run {subcommand} {project_loc}")
help_text = commands[subcommand].get("help") if subcommand in commands:
if help_text: help_text = commands[subcommand].get("help")
msg.text(f"\n{help_text}\n") if help_text:
print(f"\n{help_text}\n")
elif subcommand in workflows:
steps = workflows[subcommand]
print(f"\nWorkflow consisting of {len(steps)} commands:")
steps_data = [
(f"{i + 1}. {step}", commands[step].get("help", ""))
for i, step in enumerate(steps)
]
msg.table(steps_data)
help_cmd = f"{COMMAND} project run [COMMAND] {project_loc} --help"
print(f"For command details, run: {help_cmd}")
else: else:
print(f"\nAvailable commands in {PROJECT_FILE}") print("")
print(f"Usage: {COMMAND} project run [COMMAND] {project_loc}") if config_commands:
msg.table([(cmd["name"], cmd.get("help", "")) for cmd in config_commands]) print(f"Available commands in {PROJECT_FILE}")
msg.text(f"Run all commands defined in the 'run' block of the {PROJECT_FILE}:") print(f"Usage: {COMMAND} project run [COMMAND] {project_loc}")
print(f"{COMMAND} project run {project_loc}") msg.table([(cmd["name"], cmd.get("help", "")) for cmd in config_commands])
if workflows:
print(f"Available workflows in {PROJECT_FILE}")
print(f"Usage: {COMMAND} project run [WORKFLOW] {project_loc}")
msg.table([(name, " -> ".join(steps)) for name, steps in workflows.items()])
def run_commands( def run_commands(
@ -179,6 +192,9 @@ def check_rerun(
if command["name"] not in data: # We don't have info about this command if command["name"] not in data: # We don't have info about this command
return True return True
entry = data[command["name"]] entry = data[command["name"]]
# Always run commands with no outputs (otherwise they'd always be skipped)
if not entry.get("outs", []):
return True
# If the entry in the lockfile matches the lockfile entry that would be # If the entry in the lockfile matches the lockfile entry that would be
# generated from the current command, we don't rerun because it means that # generated from the current command, we don't rerun because it means that
# all inputs/outputs, hashes and scripts are the same and nothing changed # all inputs/outputs, hashes and scripts are the same and nothing changed

View File

@ -1,7 +1,8 @@
from typing import Dict, Any from typing import Dict, Any, Union
from pathlib import Path from pathlib import Path
from wasabi import msg from wasabi import msg
import srsly import srsly
import hashlib
from ...schemas import ProjectConfigSchema, validate from ...schemas import ProjectConfigSchema, validate
@ -11,7 +12,8 @@ PROJECT_LOCK = "project.lock"
def load_project_config(path: Path) -> Dict[str, Any]: def load_project_config(path: Path) -> Dict[str, Any]:
"""Load the project.yml file from a directory and validate it. """Load the project.yml file from a directory and validate it. Also make
sure that all directories defined in the config exist.
path (Path): The path to the project directory. path (Path): The path to the project directory.
RETURNS (Dict[str, Any]): The loaded project.yml. RETURNS (Dict[str, Any]): The loaded project.yml.
@ -28,6 +30,11 @@ def load_project_config(path: Path) -> Dict[str, Any]:
if errors: if errors:
msg.fail(invalid_err, "\n".join(errors), exits=1) msg.fail(invalid_err, "\n".join(errors), exits=1)
validate_project_commands(config) validate_project_commands(config)
# Make sure directories defined in config exist
for subdir in config.get("directories", []):
dir_path = path / subdir
if not dir_path.exists():
dir_path.mkdir(parents=True)
return config return config
@ -55,3 +62,32 @@ def validate_project_commands(config: Dict[str, Any]) -> None:
f"section of the {PROJECT_FILE}.", f"section of the {PROJECT_FILE}.",
exits=1, exits=1,
) )
def get_hash(data) -> str:
"""Get the hash for a JSON-serializable object.
data: The data to hash.
RETURNS (str): The hash.
"""
data_str = srsly.json_dumps(data, sort_keys=True).encode("utf8")
return hashlib.md5(data_str).hexdigest()
def get_checksum(path: Union[Path, str]) -> str:
"""Get the checksum for a file or directory given its file path. If a
directory path is provided, this uses all files in that directory.
path (Union[Path, str]): The file or directory path.
RETURNS (str): The checksum.
"""
path = Path(path)
if path.is_file():
return hashlib.md5(Path(path).read_bytes()).hexdigest()
if path.is_dir():
# TODO: this is currently pretty slow
dir_checksum = hashlib.md5()
for sub_file in sorted(fp for fp in path.rglob("*") if fp.is_file()):
dir_checksum.update(sub_file.read_bytes())
return dir_checksum.hexdigest()
raise ValueError(f"Can't get checksum for {path}: not a file or directory")

View File

@ -232,9 +232,10 @@ class ProjectConfigCommand(BaseModel):
name: StrictStr = Field(..., title="Name of command") name: StrictStr = Field(..., title="Name of command")
help: Optional[StrictStr] = Field(None, title="Command description") help: Optional[StrictStr] = Field(None, title="Command description")
script: List[StrictStr] = Field([], title="List of CLI commands to run, in order") script: List[StrictStr] = Field([], title="List of CLI commands to run, in order")
deps: List[StrictStr] = Field([], title="Data Version Control dependencies") deps: List[StrictStr] = Field([], title="File dependencies required by this command")
outputs: List[StrictStr] = Field([], title="Data Version Control outputs") outputs: List[StrictStr] = Field([], title="Outputs produced by this command")
outputs_no_cache: List[StrictStr] = Field([], title="Data Version Control outputs (no cache)") outputs_no_cache: List[StrictStr] = Field([], title="Outputs not tracked by DVC (DVC only)")
no_skip: bool = Field(False, title="Never skip this command, even if nothing changed")
# fmt: on # fmt: on
class Config: class Config:

View File

@ -20,7 +20,6 @@ import subprocess
from contextlib import contextmanager from contextlib import contextmanager
import tempfile import tempfile
import shutil import shutil
import hashlib
import shlex import shlex
try: try:
@ -511,25 +510,6 @@ def make_tempdir():
warnings.warn(Warnings.W091.format(dir=d, msg=e)) warnings.warn(Warnings.W091.format(dir=d, msg=e))
def get_hash(data) -> str:
"""Get the hash for a JSON-serializable object.
data: The data to hash.
RETURNS (str): The hash.
"""
data_str = srsly.json_dumps(data, sort_keys=True).encode("utf8")
return hashlib.md5(data_str).hexdigest()
def get_checksum(path: Union[Path, str]) -> str:
"""Get the checksum for a file given its file path.
path (Union[Path, str]): The file path.
RETURNS (str): The checksum.
"""
return hashlib.md5(Path(path).read_bytes()).hexdigest()
def is_cwd(path: Union[Path, str]) -> bool: def is_cwd(path: Union[Path, str]) -> bool:
"""Check whether a path is the current working directory. """Check whether a path is the current working directory.
@ -756,12 +736,12 @@ def minibatch_by_padded_size(docs, size, buffer=256, discard_oversize=False):
pass pass
else: else:
yield subbatch yield subbatch
def _batch_by_length(seqs, max_words): def _batch_by_length(seqs, max_words):
"""Given a list of sequences, return a batched list of indices into the """Given a list of sequences, return a batched list of indices into the
list, where the batches are grouped by length, in descending order. list, where the batches are grouped by length, in descending order.
Batches may be at most max_words in size, defined as max sequence length * size. Batches may be at most max_words in size, defined as max sequence length * size.
""" """
# Use negative index so we can get sort by position ascending. # Use negative index so we can get sort by position ascending.
@ -785,6 +765,7 @@ def _batch_by_length(seqs, max_words):
batches.reverse() batches.reverse()
return batches return batches
def minibatch_by_words(docs, size, tolerance=0.2, discard_oversize=False): def minibatch_by_words(docs, size, tolerance=0.2, discard_oversize=False):
"""Create minibatches of roughly a given number of words. If any examples """Create minibatches of roughly a given number of words. If any examples
are longer than the specified batch length, they will appear in a batch by are longer than the specified batch length, they will appear in a batch by