from typing import Optional, Union, Any, Dict, List, Tuple
import shutil
from pathlib import Path
from wasabi import Printer, MarkdownRenderer, get_raw_input
import srsly
import sys
from ._util import app, Arg, Opt, string_to_list, WHEEL_SUFFIX, SDIST_SUFFIX
from ..schemas import validate, ModelMetaSchema
from .. import util
from .. import about
@app.command("package")
def package_cli(
# fmt: off
input_dir: Path = Arg(..., help="Directory with pipeline data", exists=True, file_okay=False),
output_dir: Path = Arg(..., help="Output parent directory", exists=True, file_okay=False),
code_paths: str = Opt("", "--code", "-c", help="Comma-separated paths to Python file with additional code (registered functions) to be included in the package"),
meta_path: Optional[Path] = Opt(None, "--meta-path", "--meta", "-m", help="Path to meta.json", exists=True, dir_okay=False),
create_meta: bool = Opt(False, "--create-meta", "-c", "-C", help="Create meta.json, even if one exists"),
name: Optional[str] = Opt(None, "--name", "-n", help="Package name to override meta"),
version: Optional[str] = Opt(None, "--version", "-v", help="Package version to override meta"),
build: str = Opt("sdist", "--build", "-b", help="Comma-separated formats to build: sdist and/or wheel, or none."),
force: bool = Opt(False, "--force", "-f", "-F", help="Force overwriting existing data in output directory"),
# fmt: on
):
"""
Generate an installable Python package for a pipeline. Includes binary data,
meta and required installation files. A new directory will be created in the
specified output directory, and the data will be copied over. If
--create-meta is set and a meta.json already exists in the output directory,
the existing values will be used as the defaults in the command-line prompt.
After packaging, "python setup.py sdist" is run in the package directory,
which will create a .tar.gz archive that can be installed via "pip install".
If additional code files are provided (e.g. Python files containing custom
registered functions like pipeline components), they are copied into the
package and imported in the __init__.py.
DOCS: https://spacy.io/api/cli#package
"""
create_sdist, create_wheel = get_build_formats(string_to_list(build))
code_paths = [Path(p.strip()) for p in string_to_list(code_paths)]
package(
input_dir,
output_dir,
meta_path=meta_path,
code_paths=code_paths,
name=name,
version=version,
create_meta=create_meta,
create_sdist=create_sdist,
create_wheel=create_wheel,
force=force,
silent=False,
)
def package(
input_dir: Path,
output_dir: Path,
meta_path: Optional[Path] = None,
code_paths: List[Path] = [],
name: Optional[str] = None,
version: Optional[str] = None,
create_meta: bool = False,
create_sdist: bool = True,
create_wheel: bool = False,
force: bool = False,
silent: bool = True,
) -> None:
msg = Printer(no_print=silent, pretty=not silent)
input_path = util.ensure_path(input_dir)
output_path = util.ensure_path(output_dir)
meta_path = util.ensure_path(meta_path)
if create_wheel and not has_wheel():
err = "Generating a binary .whl file requires wheel to be installed"
msg.fail(err, "pip install wheel", exits=1)
if not input_path or not input_path.exists():
msg.fail("Can't locate pipeline data", input_path, exits=1)
if not output_path or not output_path.exists():
msg.fail("Output directory not found", output_path, exits=1)
if create_sdist or create_wheel:
opts = ["sdist" if create_sdist else "", "wheel" if create_wheel else ""]
msg.info(f"Building package artifacts: {', '.join(opt for opt in opts if opt)}")
for code_path in code_paths:
if not code_path.exists():
msg.fail("Can't find code file", code_path, exits=1)
# Import the code here so it's available when model is loaded (via
# get_meta helper). Also verifies that everything works
util.import_file(code_path.stem, code_path)
if code_paths:
msg.good(f"Including {len(code_paths)} Python module(s) with custom code")
if meta_path and not meta_path.exists():
msg.fail("Can't find pipeline meta.json", meta_path, exits=1)
meta_path = meta_path or input_dir / "meta.json"
if not meta_path.exists() or not meta_path.is_file():
msg.fail("Can't load pipeline meta.json", meta_path, exits=1)
meta = srsly.read_json(meta_path)
meta = get_meta(input_dir, meta)
if name is not None:
meta["name"] = name
if version is not None:
meta["version"] = version
if not create_meta: # only print if user doesn't want to overwrite
msg.good("Loaded meta.json from file", meta_path)
else:
meta = generate_meta(meta, msg)
errors = validate(ModelMetaSchema, meta)
if errors:
msg.fail("Invalid pipeline meta.json")
print("\n".join(errors))
sys.exit(1)
model_name = meta["name"]
if not model_name.startswith(meta["lang"] + "_"):
model_name = f"{meta['lang']}_{model_name}"
model_name_v = model_name + "-" + meta["version"]
main_path = output_dir / model_name_v
package_path = main_path / model_name
if package_path.exists():
if force:
shutil.rmtree(str(package_path))
else:
msg.fail(
"Package directory already exists",
"Please delete the directory and try again, or use the "
"`--force` flag to overwrite existing directories.",
exits=1,
)
Path.mkdir(package_path, parents=True)
shutil.copytree(str(input_dir), str(package_path / model_name_v))
for file_name in FILENAMES_DOCS:
file_path = package_path / model_name_v / file_name
if file_path.exists():
shutil.copy(str(file_path), str(main_path))
readme_path = main_path / "README.md"
if not readme_path.exists():
readme = generate_readme(meta)
create_file(readme_path, readme)
create_file(package_path / model_name_v / "README.md", readme)
imports = []
for code_path in code_paths:
imports.append(code_path.stem)
shutil.copy(str(code_path), str(package_path))
create_file(main_path / "meta.json", srsly.json_dumps(meta, indent=2))
create_file(main_path / "setup.py", TEMPLATE_SETUP)
create_file(main_path / "MANIFEST.in", TEMPLATE_MANIFEST)
init_py = TEMPLATE_INIT.format(
imports="\n".join(f"from . import {m}" for m in imports)
)
create_file(package_path / "__init__.py", init_py)
msg.good(f"Successfully created package '{model_name_v}'", main_path)
if create_sdist:
with util.working_dir(main_path):
util.run_command([sys.executable, "setup.py", "sdist"], capture=False)
zip_file = main_path / "dist" / f"{model_name_v}{SDIST_SUFFIX}"
msg.good(f"Successfully created zipped Python package", zip_file)
if create_wheel:
with util.working_dir(main_path):
util.run_command([sys.executable, "setup.py", "bdist_wheel"], capture=False)
wheel = main_path / "dist" / f"{model_name_v}{WHEEL_SUFFIX}"
msg.good(f"Successfully created binary wheel", wheel)
def has_wheel() -> bool:
try:
import wheel # noqa: F401
return True
except ImportError:
return False
def get_build_formats(formats: List[str]) -> Tuple[bool, bool]:
supported = ["sdist", "wheel", "none"]
for form in formats:
if form not in supported:
msg = Printer()
err = f"Unknown build format: {form}. Supported: {', '.join(supported)}"
msg.fail(err, exits=1)
if not formats or "none" in formats:
return (False, False)
return ("sdist" in formats, "wheel" in formats)
def create_file(file_path: Path, contents: str) -> None:
file_path.touch()
file_path.open("w", encoding="utf-8").write(contents)
def get_meta(
model_path: Union[str, Path], existing_meta: Dict[str, Any]
) -> Dict[str, Any]:
meta = {
"lang": "en",
"name": "pipeline",
"version": "0.0.0",
"description": "",
"author": "",
"email": "",
"url": "",
"license": "MIT",
}
meta.update(existing_meta)
nlp = util.load_model_from_path(Path(model_path))
meta["spacy_version"] = util.get_model_version_range(about.__version__)
meta["vectors"] = {
"width": nlp.vocab.vectors_length,
"vectors": len(nlp.vocab.vectors),
"keys": nlp.vocab.vectors.n_keys,
"name": nlp.vocab.vectors.name,
}
if about.__title__ != "spacy":
meta["parent_package"] = about.__title__
return meta
def generate_meta(existing_meta: Dict[str, Any], msg: Printer) -> Dict[str, Any]:
meta = existing_meta or {}
settings = [
("lang", "Pipeline language", meta.get("lang", "en")),
("name", "Pipeline name", meta.get("name", "pipeline")),
("version", "Package version", meta.get("version", "0.0.0")),
("description", "Package description", meta.get("description", None)),
("author", "Author", meta.get("author", None)),
("email", "Author email", meta.get("email", None)),
("url", "Author website", meta.get("url", None)),
("license", "License", meta.get("license", "MIT")),
]
msg.divider("Generating meta.json")
msg.text(
"Enter the package settings for your pipeline. The following information "
"will be read from your pipeline data: pipeline, vectors."
)
for setting, desc, default in settings:
response = get_raw_input(desc, default)
meta[setting] = default if response == "" and default else response
return meta
def generate_readme(meta: Dict[str, Any]) -> str:
"""
Generate a Markdown-formatted README text from a model meta.json. Used
within the GitHub release notes and as content for README.md file added
to model packages.
"""
md = MarkdownRenderer()
lang = meta["lang"]
name = f"{lang}_{meta['name']}"
version = meta["version"]
pipeline = ", ".join([md.code(p) for p in meta.get("pipeline", [])])
components = ", ".join([md.code(p) for p in meta.get("components", [])])
vecs = meta.get("vectors", {})
vectors = f"{vecs.get('keys', 0)} keys, {vecs.get('vectors', 0)} unique vectors ({ vecs.get('width', 0)} dimensions)"
author = meta.get("author") or "n/a"
notes = meta.get("notes", "")
license_name = meta.get("license")
sources = _format_sources(meta.get("sources"))
description = meta.get("description")
label_scheme = _format_label_scheme(meta.get("labels"))
accuracy = _format_accuracy(meta.get("performance"))
table_data = [
(md.bold("Name"), md.code(name)),
(md.bold("Version"), md.code(version)),
(md.bold("spaCy"), md.code(meta["spacy_version"])),
(md.bold("Default Pipeline"), pipeline),
(md.bold("Components"), components),
(md.bold("Vectors"), vectors),
(md.bold("Sources"), sources or "n/a"),
(md.bold("License"), md.code(license_name) if license_name else "n/a"),
(md.bold("Author"), md.link(author, meta["url"]) if "url" in meta else author),
]
# Put together Markdown body
if description:
md.add(description)
md.add(md.table(table_data, ["Feature", "Description"]))
if label_scheme:
md.add(md.title(3, "Label Scheme"))
md.add(label_scheme)
if accuracy:
md.add(md.title(3, "Accuracy"))
md.add(accuracy)
if notes:
md.add(notes)
return md.text
def _format_sources(data: Any) -> str:
if not data or not isinstance(data, list):
return "n/a"
sources = []
for source in data:
if not isinstance(source, dict):
source = {"name": source}
name = source.get("name")
if not name:
continue
url = source.get("url")
author = source.get("author")
result = name if not url else "[{}]({})".format(name, url)
if author:
result += " ({})".format(author)
sources.append(result)
return "
".join(sources)
def _format_accuracy(data: Dict[str, Any], exclude: List[str] = ["speed"]) -> str:
if not data:
return ""
md = MarkdownRenderer()
scalars = [(k, v) for k, v in data.items() if isinstance(v, (int, float))]
scores = [
(md.code(acc.upper()), f"{score*100:.2f}")
for acc, score in scalars
if acc not in exclude
]
md.add(md.table(scores, ["Type", "Score"]))
return md.text
def _format_label_scheme(data: Dict[str, Any]) -> str:
if not data:
return ""
md = MarkdownRenderer()
n_labels = 0
n_pipes = 0
label_data = []
for pipe, labels in data.items():
if not labels:
continue
col1 = md.bold(md.code(pipe))
col2 = ", ".join(
[md.code(label.replace("|", "\\|")) for label in labels]
) # noqa: W605
label_data.append((col1, col2))
n_labels += len(labels)
n_pipes += 1
if not label_data:
return ""
label_info = f"View label scheme ({n_labels} labels for {n_pipes} components)"
md.add("")
md.add(f"{label_info}
")
md.add(md.table(label_data, ["Component", "Labels"]))
md.add(" ")
return md.text
TEMPLATE_SETUP = """
#!/usr/bin/env python
import io
import json
from os import path, walk
from shutil import copy
from setuptools import setup
def load_meta(fp):
with io.open(fp, encoding='utf8') as f:
return json.load(f)
def load_readme(fp):
if path.exists(fp):
with io.open(fp, encoding='utf8') as f:
return f.read()
return ""
def list_files(data_dir):
output = []
for root, _, filenames in walk(data_dir):
for filename in filenames:
if not filename.startswith('.'):
output.append(path.join(root, filename))
output = [path.relpath(p, path.dirname(data_dir)) for p in output]
output.append('meta.json')
return output
def list_requirements(meta):
parent_package = meta.get('parent_package', 'spacy')
requirements = [parent_package + meta['spacy_version']]
if 'setup_requires' in meta:
requirements += meta['setup_requires']
if 'requirements' in meta:
requirements += meta['requirements']
return requirements
def setup_package():
root = path.abspath(path.dirname(__file__))
meta_path = path.join(root, 'meta.json')
meta = load_meta(meta_path)
readme_path = path.join(root, 'README.md')
readme = load_readme(readme_path)
model_name = str(meta['lang'] + '_' + meta['name'])
model_dir = path.join(model_name, model_name + '-' + meta['version'])
copy(meta_path, path.join(model_name))
copy(meta_path, model_dir)
setup(
name=model_name,
description=meta.get('description'),
long_description=readme,
author=meta.get('author'),
author_email=meta.get('email'),
url=meta.get('url'),
version=meta['version'],
license=meta.get('license'),
packages=[model_name],
package_data={model_name: list_files(model_dir)},
install_requires=list_requirements(meta),
zip_safe=False,
entry_points={'spacy_models': ['{m} = {m}'.format(m=model_name)]}
)
if __name__ == '__main__':
setup_package()
""".lstrip()
TEMPLATE_MANIFEST = """
include meta.json
include LICENSE
include LICENSES_SOURCES
include README.md
""".strip()
TEMPLATE_INIT = """
from pathlib import Path
from spacy.util import load_model_from_init_py, get_model_meta
{imports}
__version__ = get_model_meta(Path(__file__).parent)['version']
def load(**overrides):
return load_model_from_init_py(__file__, **overrides)
""".lstrip()
FILENAMES_DOCS = ["LICENSE", "LICENSES_SOURCES", "README.md"]