mirror of
				https://github.com/explosion/spaCy.git
				synced 2025-11-04 09:57:26 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			361 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			361 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from typing import Optional, List, Dict, Any, Union, IO
 | 
						|
import math
 | 
						|
from tqdm import tqdm
 | 
						|
import numpy
 | 
						|
from ast import literal_eval
 | 
						|
from pathlib import Path
 | 
						|
from preshed.counter import PreshCounter
 | 
						|
import tarfile
 | 
						|
import gzip
 | 
						|
import zipfile
 | 
						|
import srsly
 | 
						|
import warnings
 | 
						|
from wasabi import msg, Printer
 | 
						|
import typer
 | 
						|
 | 
						|
from ._util import app, init_cli, Arg, Opt
 | 
						|
from ..vectors import Vectors
 | 
						|
from ..errors import Errors, Warnings
 | 
						|
from ..language import Language
 | 
						|
from ..util import ensure_path, get_lang_class, load_model, OOV_RANK
 | 
						|
 | 
						|
try:
 | 
						|
    import ftfy
 | 
						|
except ImportError:
 | 
						|
    ftfy = None
 | 
						|
 | 
						|
 | 
						|
DEFAULT_OOV_PROB = -20
 | 
						|
 | 
						|
 | 
						|
@init_cli.command("vocab")
 | 
						|
@app.command(
 | 
						|
    "init-model",
 | 
						|
    context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
 | 
						|
    hidden=True,  # hide this from main CLI help but still allow it to work with warning
 | 
						|
)
 | 
						|
def init_model_cli(
 | 
						|
    # fmt: off
 | 
						|
    ctx: typer.Context,  # This is only used to read additional arguments
 | 
						|
    lang: str = Arg(..., help="Pipeline language"),
 | 
						|
    output_dir: Path = Arg(..., help="Pipeline output directory"),
 | 
						|
    freqs_loc: Optional[Path] = Arg(None, help="Location of words frequencies file", exists=True),
 | 
						|
    clusters_loc: Optional[Path] = Opt(None, "--clusters-loc", "-c", help="Optional location of brown clusters data", exists=True),
 | 
						|
    jsonl_loc: Optional[Path] = Opt(None, "--jsonl-loc", "-j", help="Location of JSONL-formatted attributes file", exists=True),
 | 
						|
    vectors_loc: Optional[Path] = Opt(None, "--vectors-loc", "-v", help="Optional vectors file in Word2Vec format", exists=True),
 | 
						|
    prune_vectors: int = Opt(-1, "--prune-vectors", "-V", help="Optional number of vectors to prune to"),
 | 
						|
    truncate_vectors: int = Opt(0, "--truncate-vectors", "-t", help="Optional number of vectors to truncate to when reading in vectors file"),
 | 
						|
    vectors_name: Optional[str] = Opt(None, "--vectors-name", "-vn", help="Optional name for the word vectors, e.g. en_core_web_lg.vectors"),
 | 
						|
    model_name: Optional[str] = Opt(None, "--meta-name", "-mn", help="Optional name of the package for the pipeline meta"),
 | 
						|
    base_model: Optional[str] = Opt(None, "--base", "-b", help="Name of or path to base pipeline to start with (mostly relevant for pipelines with custom tokenizers)")
 | 
						|
    # fmt: on
 | 
						|
):
 | 
						|
    """
 | 
						|
    Create a new blank pipeline directory with vocab and vectors from raw data.
 | 
						|
    If vectors are provided in Word2Vec format, they can be either a .txt or
 | 
						|
    zipped as a .zip or .tar.gz.
 | 
						|
 | 
						|
    DOCS: https://nightly.spacy.io/api/cli#init-vocab
 | 
						|
    """
 | 
						|
    if ctx.command.name == "init-model":
 | 
						|
        msg.warn(
 | 
						|
            "The init-model command is now called 'init vocab'. You can run "
 | 
						|
            "'python -m spacy init --help' for an overview of the other "
 | 
						|
            "available initialization commands."
 | 
						|
        )
 | 
						|
    init_model(
 | 
						|
        lang,
 | 
						|
        output_dir,
 | 
						|
        freqs_loc=freqs_loc,
 | 
						|
        clusters_loc=clusters_loc,
 | 
						|
        jsonl_loc=jsonl_loc,
 | 
						|
        vectors_loc=vectors_loc,
 | 
						|
        prune_vectors=prune_vectors,
 | 
						|
        truncate_vectors=truncate_vectors,
 | 
						|
        vectors_name=vectors_name,
 | 
						|
        model_name=model_name,
 | 
						|
        base_model=base_model,
 | 
						|
        silent=False,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def init_model(
 | 
						|
    lang: str,
 | 
						|
    output_dir: Path,
 | 
						|
    freqs_loc: Optional[Path] = None,
 | 
						|
    clusters_loc: Optional[Path] = None,
 | 
						|
    jsonl_loc: Optional[Path] = None,
 | 
						|
    vectors_loc: Optional[Path] = None,
 | 
						|
    prune_vectors: int = -1,
 | 
						|
    truncate_vectors: int = 0,
 | 
						|
    vectors_name: Optional[str] = None,
 | 
						|
    model_name: Optional[str] = None,
 | 
						|
    base_model: Optional[str] = None,
 | 
						|
    silent: bool = True,
 | 
						|
) -> Language:
 | 
						|
    msg = Printer(no_print=silent, pretty=not silent)
 | 
						|
    if jsonl_loc is not None:
 | 
						|
        if freqs_loc is not None or clusters_loc is not None:
 | 
						|
            settings = ["-j"]
 | 
						|
            if freqs_loc:
 | 
						|
                settings.append("-f")
 | 
						|
            if clusters_loc:
 | 
						|
                settings.append("-c")
 | 
						|
            msg.warn(
 | 
						|
                "Incompatible arguments",
 | 
						|
                "The -f and -c arguments are deprecated, and not compatible "
 | 
						|
                "with the -j argument, which should specify the same "
 | 
						|
                "information. Either merge the frequencies and clusters data "
 | 
						|
                "into the JSONL-formatted file (recommended), or use only the "
 | 
						|
                "-f and -c files, without the other lexical attributes.",
 | 
						|
            )
 | 
						|
        jsonl_loc = ensure_path(jsonl_loc)
 | 
						|
        lex_attrs = srsly.read_jsonl(jsonl_loc)
 | 
						|
    else:
 | 
						|
        clusters_loc = ensure_path(clusters_loc)
 | 
						|
        freqs_loc = ensure_path(freqs_loc)
 | 
						|
        if freqs_loc is not None and not freqs_loc.exists():
 | 
						|
            msg.fail("Can't find words frequencies file", freqs_loc, exits=1)
 | 
						|
        lex_attrs = read_attrs_from_deprecated(msg, freqs_loc, clusters_loc)
 | 
						|
 | 
						|
    with msg.loading("Creating blank pipeline..."):
 | 
						|
        nlp = create_model(lang, lex_attrs, name=model_name, base_model=base_model)
 | 
						|
 | 
						|
    msg.good("Successfully created blank pipeline")
 | 
						|
    if vectors_loc is not None:
 | 
						|
        add_vectors(
 | 
						|
            msg, nlp, vectors_loc, truncate_vectors, prune_vectors, vectors_name
 | 
						|
        )
 | 
						|
    vec_added = len(nlp.vocab.vectors)
 | 
						|
    lex_added = len(nlp.vocab)
 | 
						|
    msg.good(
 | 
						|
        "Sucessfully compiled vocab", f"{lex_added} entries, {vec_added} vectors",
 | 
						|
    )
 | 
						|
    if not output_dir.exists():
 | 
						|
        output_dir.mkdir()
 | 
						|
    nlp.to_disk(output_dir)
 | 
						|
    return nlp
 | 
						|
 | 
						|
 | 
						|
def open_file(loc: Union[str, Path]) -> IO:
 | 
						|
    """Handle .gz, .tar.gz or unzipped files"""
 | 
						|
    loc = ensure_path(loc)
 | 
						|
    if tarfile.is_tarfile(str(loc)):
 | 
						|
        return tarfile.open(str(loc), "r:gz")
 | 
						|
    elif loc.parts[-1].endswith("gz"):
 | 
						|
        return (line.decode("utf8") for line in gzip.open(str(loc), "r"))
 | 
						|
    elif loc.parts[-1].endswith("zip"):
 | 
						|
        zip_file = zipfile.ZipFile(str(loc))
 | 
						|
        names = zip_file.namelist()
 | 
						|
        file_ = zip_file.open(names[0])
 | 
						|
        return (line.decode("utf8") for line in file_)
 | 
						|
    else:
 | 
						|
        return loc.open("r", encoding="utf8")
 | 
						|
 | 
						|
 | 
						|
def read_attrs_from_deprecated(
 | 
						|
    msg: Printer, freqs_loc: Optional[Path], clusters_loc: Optional[Path]
 | 
						|
) -> List[Dict[str, Any]]:
 | 
						|
    if freqs_loc is not None:
 | 
						|
        with msg.loading("Counting frequencies..."):
 | 
						|
            probs, _ = read_freqs(freqs_loc)
 | 
						|
        msg.good("Counted frequencies")
 | 
						|
    else:
 | 
						|
        probs, _ = ({}, DEFAULT_OOV_PROB)  # noqa: F841
 | 
						|
    if clusters_loc:
 | 
						|
        with msg.loading("Reading clusters..."):
 | 
						|
            clusters = read_clusters(clusters_loc)
 | 
						|
        msg.good("Read clusters")
 | 
						|
    else:
 | 
						|
        clusters = {}
 | 
						|
    lex_attrs = []
 | 
						|
    sorted_probs = sorted(probs.items(), key=lambda item: item[1], reverse=True)
 | 
						|
    if len(sorted_probs):
 | 
						|
        for i, (word, prob) in tqdm(enumerate(sorted_probs)):
 | 
						|
            attrs = {"orth": word, "id": i, "prob": prob}
 | 
						|
            # Decode as a little-endian string, so that we can do & 15 to get
 | 
						|
            # the first 4 bits. See _parse_features.pyx
 | 
						|
            if word in clusters:
 | 
						|
                attrs["cluster"] = int(clusters[word][::-1], 2)
 | 
						|
            else:
 | 
						|
                attrs["cluster"] = 0
 | 
						|
            lex_attrs.append(attrs)
 | 
						|
    return lex_attrs
 | 
						|
 | 
						|
 | 
						|
def create_model(
 | 
						|
    lang: str,
 | 
						|
    lex_attrs: List[Dict[str, Any]],
 | 
						|
    name: Optional[str] = None,
 | 
						|
    base_model: Optional[Union[str, Path]] = None,
 | 
						|
) -> Language:
 | 
						|
    if base_model:
 | 
						|
        nlp = load_model(base_model)
 | 
						|
        # keep the tokenizer but remove any existing pipeline components due to
 | 
						|
        # potentially conflicting vectors
 | 
						|
        for pipe in nlp.pipe_names:
 | 
						|
            nlp.remove_pipe(pipe)
 | 
						|
    else:
 | 
						|
        lang_class = get_lang_class(lang)
 | 
						|
        nlp = lang_class()
 | 
						|
    for lexeme in nlp.vocab:
 | 
						|
        lexeme.rank = OOV_RANK
 | 
						|
    for attrs in lex_attrs:
 | 
						|
        if "settings" in attrs:
 | 
						|
            continue
 | 
						|
        lexeme = nlp.vocab[attrs["orth"]]
 | 
						|
        lexeme.set_attrs(**attrs)
 | 
						|
    if len(nlp.vocab):
 | 
						|
        oov_prob = min(lex.prob for lex in nlp.vocab) - 1
 | 
						|
    else:
 | 
						|
        oov_prob = DEFAULT_OOV_PROB
 | 
						|
    nlp.vocab.cfg.update({"oov_prob": oov_prob})
 | 
						|
    if name:
 | 
						|
        nlp.meta["name"] = name
 | 
						|
    return nlp
 | 
						|
 | 
						|
 | 
						|
def add_vectors(
 | 
						|
    msg: Printer,
 | 
						|
    nlp: Language,
 | 
						|
    vectors_loc: Optional[Path],
 | 
						|
    truncate_vectors: int,
 | 
						|
    prune_vectors: int,
 | 
						|
    name: Optional[str] = None,
 | 
						|
) -> None:
 | 
						|
    vectors_loc = ensure_path(vectors_loc)
 | 
						|
    if vectors_loc and vectors_loc.parts[-1].endswith(".npz"):
 | 
						|
        nlp.vocab.vectors = Vectors(data=numpy.load(vectors_loc.open("rb")))
 | 
						|
        for lex in nlp.vocab:
 | 
						|
            if lex.rank and lex.rank != OOV_RANK:
 | 
						|
                nlp.vocab.vectors.add(lex.orth, row=lex.rank)
 | 
						|
    else:
 | 
						|
        if vectors_loc:
 | 
						|
            with msg.loading(f"Reading vectors from {vectors_loc}"):
 | 
						|
                vectors_data, vector_keys = read_vectors(
 | 
						|
                    msg, vectors_loc, truncate_vectors
 | 
						|
                )
 | 
						|
            msg.good(f"Loaded vectors from {vectors_loc}")
 | 
						|
        else:
 | 
						|
            vectors_data, vector_keys = (None, None)
 | 
						|
        if vector_keys is not None:
 | 
						|
            for word in vector_keys:
 | 
						|
                if word not in nlp.vocab:
 | 
						|
                    nlp.vocab[word]
 | 
						|
        if vectors_data is not None:
 | 
						|
            nlp.vocab.vectors = Vectors(data=vectors_data, keys=vector_keys)
 | 
						|
    if name is None:
 | 
						|
        # TODO: Is this correct? Does this matter?
 | 
						|
        nlp.vocab.vectors.name = f"{nlp.meta['lang']}_{nlp.meta['name']}.vectors"
 | 
						|
    else:
 | 
						|
        nlp.vocab.vectors.name = name
 | 
						|
    nlp.meta["vectors"]["name"] = nlp.vocab.vectors.name
 | 
						|
    if prune_vectors >= 1:
 | 
						|
        nlp.vocab.prune_vectors(prune_vectors)
 | 
						|
 | 
						|
 | 
						|
def read_vectors(msg: Printer, vectors_loc: Path, truncate_vectors: int):
 | 
						|
    f = open_file(vectors_loc)
 | 
						|
    f = ensure_shape(f)
 | 
						|
    shape = tuple(int(size) for size in next(f).split())
 | 
						|
    if truncate_vectors >= 1:
 | 
						|
        shape = (truncate_vectors, shape[1])
 | 
						|
    vectors_data = numpy.zeros(shape=shape, dtype="f")
 | 
						|
    vectors_keys = []
 | 
						|
    for i, line in enumerate(tqdm(f)):
 | 
						|
        line = line.rstrip()
 | 
						|
        pieces = line.rsplit(" ", vectors_data.shape[1])
 | 
						|
        word = pieces.pop(0)
 | 
						|
        if len(pieces) != vectors_data.shape[1]:
 | 
						|
            msg.fail(Errors.E094.format(line_num=i, loc=vectors_loc), exits=1)
 | 
						|
        vectors_data[i] = numpy.asarray(pieces, dtype="f")
 | 
						|
        vectors_keys.append(word)
 | 
						|
        if i == truncate_vectors - 1:
 | 
						|
            break
 | 
						|
    return vectors_data, vectors_keys
 | 
						|
 | 
						|
 | 
						|
def ensure_shape(lines):
 | 
						|
    """Ensure that the first line of the data is the vectors shape.
 | 
						|
 | 
						|
    If it's not, we read in the data and output the shape as the first result,
 | 
						|
    so that the reader doesn't have to deal with the problem.
 | 
						|
    """
 | 
						|
    first_line = next(lines)
 | 
						|
    try:
 | 
						|
        shape = tuple(int(size) for size in first_line.split())
 | 
						|
    except ValueError:
 | 
						|
        shape = None
 | 
						|
    if shape is not None:
 | 
						|
        # All good, give the data
 | 
						|
        yield first_line
 | 
						|
        yield from lines
 | 
						|
    else:
 | 
						|
        # Figure out the shape, make it the first value, and then give the
 | 
						|
        # rest of the data.
 | 
						|
        width = len(first_line.split()) - 1
 | 
						|
        captured = [first_line] + list(lines)
 | 
						|
        length = len(captured)
 | 
						|
        yield f"{length} {width}"
 | 
						|
        yield from captured
 | 
						|
 | 
						|
 | 
						|
def read_freqs(
 | 
						|
    freqs_loc: Path, max_length: int = 100, min_doc_freq: int = 5, min_freq: int = 50
 | 
						|
):
 | 
						|
    counts = PreshCounter()
 | 
						|
    total = 0
 | 
						|
    with freqs_loc.open() as f:
 | 
						|
        for i, line in enumerate(f):
 | 
						|
            freq, doc_freq, key = line.rstrip().split("\t", 2)
 | 
						|
            freq = int(freq)
 | 
						|
            counts.inc(i + 1, freq)
 | 
						|
            total += freq
 | 
						|
    counts.smooth()
 | 
						|
    log_total = math.log(total)
 | 
						|
    probs = {}
 | 
						|
    with freqs_loc.open() as f:
 | 
						|
        for line in tqdm(f):
 | 
						|
            freq, doc_freq, key = line.rstrip().split("\t", 2)
 | 
						|
            doc_freq = int(doc_freq)
 | 
						|
            freq = int(freq)
 | 
						|
            if doc_freq >= min_doc_freq and freq >= min_freq and len(key) < max_length:
 | 
						|
                try:
 | 
						|
                    word = literal_eval(key)
 | 
						|
                except SyntaxError:
 | 
						|
                    # Take odd strings literally.
 | 
						|
                    word = literal_eval(f"'{key}'")
 | 
						|
                smooth_count = counts.smoother(int(freq))
 | 
						|
                probs[word] = math.log(smooth_count) - log_total
 | 
						|
    oov_prob = math.log(counts.smoother(0)) - log_total
 | 
						|
    return probs, oov_prob
 | 
						|
 | 
						|
 | 
						|
def read_clusters(clusters_loc: Path) -> dict:
 | 
						|
    clusters = {}
 | 
						|
    if ftfy is None:
 | 
						|
        warnings.warn(Warnings.W004)
 | 
						|
    with clusters_loc.open() as f:
 | 
						|
        for line in tqdm(f):
 | 
						|
            try:
 | 
						|
                cluster, word, freq = line.split()
 | 
						|
                if ftfy is not None:
 | 
						|
                    word = ftfy.fix_text(word)
 | 
						|
            except ValueError:
 | 
						|
                continue
 | 
						|
            # If the clusterer has only seen the word a few times, its
 | 
						|
            # cluster is unreliable.
 | 
						|
            if int(freq) >= 3:
 | 
						|
                clusters[word] = cluster
 | 
						|
            else:
 | 
						|
                clusters[word] = "0"
 | 
						|
    # Expand clusters with re-casing
 | 
						|
    for word, cluster in list(clusters.items()):
 | 
						|
        if word.lower() not in clusters:
 | 
						|
            clusters[word.lower()] = cluster
 | 
						|
        if word.title() not in clusters:
 | 
						|
            clusters[word.title()] = cluster
 | 
						|
        if word.upper() not in clusters:
 | 
						|
            clusters[word.upper()] = cluster
 | 
						|
    return clusters
 |