mirror of
https://github.com/explosion/spaCy.git
synced 2025-01-04 06:16:33 +03:00
b5143b1b84
* Provide default value for `msg` * Compare paths correctly for file conversion
272 lines
9.9 KiB
Python
272 lines
9.9 KiB
Python
from typing import Callable, Iterable, Mapping, Optional, Any, List, Union
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from wasabi import Printer
|
|
import srsly
|
|
import re
|
|
import sys
|
|
import itertools
|
|
|
|
from ._util import app, Arg, Opt
|
|
from ..training import docs_to_json
|
|
from ..tokens import Doc, DocBin
|
|
from ..training.converters import iob_to_docs, conll_ner_to_docs, json_to_docs
|
|
from ..training.converters import conllu_to_docs
|
|
|
|
|
|
# Converters are matched by file extension except for ner/iob, which are
|
|
# matched by file extension and content. To add a converter, add a new
|
|
# entry to this dict with the file extension mapped to the converter function
|
|
# imported from /converters.
|
|
|
|
CONVERTERS: Mapping[str, Callable[..., Iterable[Doc]]] = {
|
|
"conllubio": conllu_to_docs,
|
|
"conllu": conllu_to_docs,
|
|
"conll": conll_ner_to_docs,
|
|
"ner": conll_ner_to_docs,
|
|
"iob": iob_to_docs,
|
|
"json": json_to_docs,
|
|
}
|
|
|
|
|
|
# File types that can be written to stdout
|
|
FILE_TYPES_STDOUT = ("json",)
|
|
|
|
|
|
class FileTypes(str, Enum):
|
|
json = "json"
|
|
spacy = "spacy"
|
|
|
|
|
|
@app.command("convert")
|
|
def convert_cli(
|
|
# fmt: off
|
|
input_path: str = Arg(..., help="Input file or directory", exists=True),
|
|
output_dir: Path = Arg("-", help="Output directory. '-' for stdout.", allow_dash=True, exists=True),
|
|
file_type: FileTypes = Opt("spacy", "--file-type", "-t", help="Type of data to produce"),
|
|
n_sents: int = Opt(1, "--n-sents", "-n", help="Number of sentences per doc (0 to disable)"),
|
|
seg_sents: bool = Opt(False, "--seg-sents", "-s", help="Segment sentences (for -c ner)"),
|
|
model: Optional[str] = Opt(None, "--model", "--base", "-b", help="Trained spaCy pipeline for sentence segmentation to use as base (for --seg-sents)"),
|
|
morphology: bool = Opt(False, "--morphology", "-m", help="Enable appending morphology to tags"),
|
|
merge_subtokens: bool = Opt(False, "--merge-subtokens", "-T", help="Merge CoNLL-U subtokens"),
|
|
converter: str = Opt("auto", "--converter", "-c", help=f"Converter: {tuple(CONVERTERS.keys())}"),
|
|
ner_map: Optional[Path] = Opt(None, "--ner-map", "-nm", help="NER tag mapping (as JSON-encoded dict of entity types)", exists=True),
|
|
lang: Optional[str] = Opt(None, "--lang", "-l", help="Language (if tokenizer required)"),
|
|
concatenate: bool = Opt(None, "--concatenate", "-C", help="Concatenate output to a single file"),
|
|
# fmt: on
|
|
):
|
|
"""
|
|
Convert files into json or DocBin format for training. The resulting .spacy
|
|
file can be used with the train command and other experiment management
|
|
functions.
|
|
|
|
If no output_dir is specified and the output format is JSON, the data
|
|
is written to stdout, so you can pipe them forward to a JSON file:
|
|
$ spacy convert some_file.conllu --file-type json > some_file.json
|
|
|
|
DOCS: https://spacy.io/api/cli#convert
|
|
"""
|
|
input_path = Path(input_path)
|
|
output_dir: Union[str, Path] = "-" if output_dir == Path("-") else output_dir
|
|
silent = output_dir == "-"
|
|
msg = Printer(no_print=silent)
|
|
verify_cli_args(msg, input_path, output_dir, file_type.value, converter, ner_map)
|
|
converter = _get_converter(msg, converter, input_path)
|
|
convert(
|
|
input_path,
|
|
output_dir,
|
|
file_type=file_type.value,
|
|
n_sents=n_sents,
|
|
seg_sents=seg_sents,
|
|
model=model,
|
|
morphology=morphology,
|
|
merge_subtokens=merge_subtokens,
|
|
converter=converter,
|
|
ner_map=ner_map,
|
|
lang=lang,
|
|
concatenate=concatenate,
|
|
silent=silent,
|
|
msg=msg,
|
|
)
|
|
|
|
|
|
def convert(
|
|
input_path: Path,
|
|
output_dir: Union[str, Path],
|
|
*,
|
|
file_type: str = "json",
|
|
n_sents: int = 1,
|
|
seg_sents: bool = False,
|
|
model: Optional[str] = None,
|
|
morphology: bool = False,
|
|
merge_subtokens: bool = False,
|
|
converter: str = "auto",
|
|
ner_map: Optional[Path] = None,
|
|
lang: Optional[str] = None,
|
|
concatenate: bool = False,
|
|
silent: bool = True,
|
|
msg: Optional[Printer] = None,
|
|
) -> None:
|
|
input_path = Path(input_path)
|
|
if not msg:
|
|
msg = Printer(no_print=silent)
|
|
ner_map = srsly.read_json(ner_map) if ner_map is not None else None
|
|
doc_files = []
|
|
for input_loc in walk_directory(input_path, converter):
|
|
with input_loc.open("r", encoding="utf-8") as infile:
|
|
input_data = infile.read()
|
|
# Use converter function to convert data
|
|
func = CONVERTERS[converter]
|
|
docs = func(
|
|
input_data,
|
|
n_sents=n_sents,
|
|
seg_sents=seg_sents,
|
|
append_morphology=morphology,
|
|
merge_subtokens=merge_subtokens,
|
|
lang=lang,
|
|
model=model,
|
|
no_print=silent,
|
|
ner_map=ner_map,
|
|
)
|
|
doc_files.append((input_loc, docs))
|
|
if concatenate:
|
|
all_docs = itertools.chain.from_iterable([docs for _, docs in doc_files])
|
|
doc_files = [(input_path, all_docs)]
|
|
for input_loc, docs in doc_files:
|
|
if file_type == "json":
|
|
data = [docs_to_json(docs)]
|
|
len_docs = len(data)
|
|
else:
|
|
db = DocBin(docs=docs, store_user_data=True)
|
|
len_docs = len(db)
|
|
data = db.to_bytes() # type: ignore[assignment]
|
|
if output_dir == "-":
|
|
_print_docs_to_stdout(data, file_type)
|
|
else:
|
|
if input_loc != input_path:
|
|
subpath = input_loc.relative_to(input_path)
|
|
output_file = Path(output_dir) / subpath.with_suffix(f".{file_type}")
|
|
else:
|
|
output_file = Path(output_dir) / input_loc.parts[-1]
|
|
output_file = output_file.with_suffix(f".{file_type}")
|
|
_write_docs_to_file(data, output_file, file_type)
|
|
msg.good(f"Generated output file ({len_docs} documents): {output_file}")
|
|
|
|
|
|
def _print_docs_to_stdout(data: Any, output_type: str) -> None:
|
|
if output_type == "json":
|
|
srsly.write_json("-", data)
|
|
else:
|
|
sys.stdout.buffer.write(data)
|
|
|
|
|
|
def _write_docs_to_file(data: Any, output_file: Path, output_type: str) -> None:
|
|
if not output_file.parent.exists():
|
|
output_file.parent.mkdir(parents=True)
|
|
if output_type == "json":
|
|
srsly.write_json(output_file, data)
|
|
else:
|
|
with output_file.open("wb") as file_:
|
|
file_.write(data)
|
|
|
|
|
|
def autodetect_ner_format(input_data: str) -> Optional[str]:
|
|
# guess format from the first 20 lines
|
|
lines = input_data.split("\n")[:20]
|
|
format_guesses = {"ner": 0, "iob": 0}
|
|
iob_re = re.compile(r"\S+\|(O|[IB]-\S+)")
|
|
ner_re = re.compile(r"\S+\s+(O|[IB]-\S+)$")
|
|
for line in lines:
|
|
line = line.strip()
|
|
if iob_re.search(line):
|
|
format_guesses["iob"] += 1
|
|
if ner_re.search(line):
|
|
format_guesses["ner"] += 1
|
|
if format_guesses["iob"] == 0 and format_guesses["ner"] > 0:
|
|
return "ner"
|
|
if format_guesses["ner"] == 0 and format_guesses["iob"] > 0:
|
|
return "iob"
|
|
return None
|
|
|
|
|
|
def walk_directory(path: Path, converter: str) -> List[Path]:
|
|
if not path.is_dir():
|
|
return [path]
|
|
paths = [path]
|
|
locs = []
|
|
seen = set()
|
|
for path in paths:
|
|
if str(path) in seen:
|
|
continue
|
|
seen.add(str(path))
|
|
if path.parts[-1].startswith("."):
|
|
continue
|
|
elif path.is_dir():
|
|
paths.extend(path.iterdir())
|
|
elif converter == "json" and not path.parts[-1].endswith("json"):
|
|
continue
|
|
elif converter == "conll" and not path.parts[-1].endswith("conll"):
|
|
continue
|
|
elif converter == "iob" and not path.parts[-1].endswith("iob"):
|
|
continue
|
|
else:
|
|
locs.append(path)
|
|
# It's good to sort these, in case the ordering messes up cache.
|
|
locs.sort()
|
|
return locs
|
|
|
|
|
|
def verify_cli_args(
|
|
msg: Printer,
|
|
input_path: Path,
|
|
output_dir: Union[str, Path],
|
|
file_type: str,
|
|
converter: str,
|
|
ner_map: Optional[Path],
|
|
):
|
|
if file_type not in FILE_TYPES_STDOUT and output_dir == "-":
|
|
msg.fail(
|
|
f"Can't write .{file_type} data to stdout. Please specify an output directory.",
|
|
exits=1,
|
|
)
|
|
if not input_path.exists():
|
|
msg.fail("Input file not found", input_path, exits=1)
|
|
if output_dir != "-" and not Path(output_dir).exists():
|
|
msg.fail("Output directory not found", output_dir, exits=1)
|
|
if ner_map is not None and not Path(ner_map).exists():
|
|
msg.fail("NER map not found", ner_map, exits=1)
|
|
if input_path.is_dir():
|
|
input_locs = walk_directory(input_path, converter)
|
|
if len(input_locs) == 0:
|
|
msg.fail("No input files in directory", input_path, exits=1)
|
|
file_types = list(set([loc.suffix[1:] for loc in input_locs]))
|
|
if converter == "auto" and len(file_types) >= 2:
|
|
file_types_str = ",".join(file_types)
|
|
msg.fail("All input files must be same type", file_types_str, exits=1)
|
|
if converter != "auto" and converter not in CONVERTERS:
|
|
msg.fail(f"Can't find converter for {converter}", exits=1)
|
|
|
|
|
|
def _get_converter(msg, converter, input_path: Path):
|
|
if input_path.is_dir():
|
|
input_path = walk_directory(input_path, converter)[0]
|
|
if converter == "auto":
|
|
converter = input_path.suffix[1:]
|
|
if converter == "ner" or converter == "iob":
|
|
with input_path.open(encoding="utf8") as file_:
|
|
input_data = file_.read()
|
|
converter_autodetect = autodetect_ner_format(input_data)
|
|
if converter_autodetect == "ner":
|
|
msg.info("Auto-detected token-per-line NER format")
|
|
converter = converter_autodetect
|
|
elif converter_autodetect == "iob":
|
|
msg.info("Auto-detected sentence-per-line NER format")
|
|
converter = converter_autodetect
|
|
else:
|
|
msg.warn(
|
|
"Can't automatically detect NER format. "
|
|
"Conversion may not succeed. "
|
|
"See https://spacy.io/api/cli#convert"
|
|
)
|
|
return converter
|