spaCy/examples/pipeline/multi_processing.py

85 lines
2.8 KiB
Python
Raw Permalink Normal View History

2017-11-01 02:43:22 +03:00
#!/usr/bin/env python
# coding: utf8
"""Example of multi-processing with Joblib. Here, we're exporting
2017-10-27 02:50:44 +03:00
part-of-speech-tagged, true-cased, (very roughly) sentence-separated text, with
each "sentence" on a newline, and spaces between tokens. Data is loaded from
the IMDB movie reviews dataset and will be loaded automatically via Thinc's
built-in dataset loader.
2017-10-27 02:48:52 +03:00
2017-11-07 03:22:30 +03:00
Compatible with: spaCy v2.0.0+
2019-03-16 16:15:49 +03:00
Last tested with: v2.1.0
Prerequisites: pip install joblib
2017-10-27 02:48:52 +03:00
"""
2017-10-27 02:50:44 +03:00
from __future__ import print_function, unicode_literals
2018-12-02 06:26:26 +03:00
2017-10-27 02:48:52 +03:00
from pathlib import Path
from joblib import Parallel, delayed
2019-02-21 12:33:16 +03:00
from functools import partial
2017-10-27 02:48:52 +03:00
import thinc.extra.datasets
import plac
import spacy
2019-02-21 12:33:16 +03:00
from spacy.util import minibatch
2017-10-27 02:48:52 +03:00
@plac.annotations(
output_dir=("Output directory", "positional", None, Path),
model=("Model name (needs tagger)", "positional", None, str),
n_jobs=("Number of workers", "option", "n", int),
batch_size=("Batch-size for each process", "option", "b", int),
2018-12-02 06:26:26 +03:00
limit=("Limit of entries from the dataset", "option", "l", int),
)
def main(output_dir, model="en_core_web_sm", n_jobs=4, batch_size=1000, limit=10000):
2017-10-27 02:48:52 +03:00
nlp = spacy.load(model) # load spaCy model
print("Loaded model '%s'" % model)
if not output_dir.exists():
output_dir.mkdir()
# load and pre-process the IMBD dataset
print("Loading IMDB data...")
data, _ = thinc.extra.datasets.imdb()
texts, _ = zip(*data[-limit:])
2017-11-05 01:06:55 +03:00
print("Processing texts...")
2019-02-21 12:33:16 +03:00
partitions = minibatch(texts, size=batch_size)
executor = Parallel(n_jobs=n_jobs, backend="multiprocessing", prefer="processes")
do = delayed(partial(transform_texts, nlp))
tasks = (do(i, batch, output_dir) for i, batch in enumerate(partitions))
2017-11-05 01:07:57 +03:00
executor(tasks)
2017-10-27 02:48:52 +03:00
2017-11-05 01:07:57 +03:00
def transform_texts(nlp, batch_id, texts, output_dir):
print(nlp.pipe_names)
2018-12-02 06:26:26 +03:00
out_path = Path(output_dir) / ("%d.txt" % batch_id)
2017-10-27 02:48:52 +03:00
if out_path.exists(): # return None in case same batch is called again
return None
2018-12-02 06:26:26 +03:00
print("Processing batch", batch_id)
with out_path.open("w", encoding="utf8") as f:
2017-11-05 01:07:57 +03:00
for doc in nlp.pipe(texts):
2018-12-02 06:26:26 +03:00
f.write(" ".join(represent_word(w) for w in doc if not w.is_space))
f.write("\n")
print("Saved {} texts to {}.txt".format(len(texts), batch_id))
2017-10-27 02:48:52 +03:00
def represent_word(word):
text = word.text
# True-case, i.e. try to normalize sentence-initial capitals.
# Only do this if the lower-cased form is more probable.
2018-12-02 06:26:26 +03:00
if (
text.istitle()
and is_sent_begin(word)
and word.prob < word.doc.vocab[text.lower()].prob
):
2017-10-27 02:48:52 +03:00
text = text.lower()
2018-12-02 06:26:26 +03:00
return text + "|" + word.tag_
2017-10-27 02:48:52 +03:00
def is_sent_begin(word):
if word.i == 0:
return True
2018-12-02 06:26:26 +03:00
elif word.i >= 2 and word.nbor(-1).text in (".", "!", "?", "..."):
2017-10-27 02:48:52 +03:00
return True
else:
return False
2018-12-02 06:26:26 +03:00
if __name__ == "__main__":
2017-10-27 02:48:52 +03:00
plac.call(main)