* Add a .pipe method, that takes a stream of input, operates on it, and streams the output. Internally, the stream may be buffered, to allow multi-threading.

This commit is contained in:
Matthew Honnibal 2016-02-03 02:04:55 +01:00
parent fcfc17a164
commit 84b247ef83
4 changed files with 53 additions and 24 deletions

View File

@ -269,16 +269,24 @@ class Language(object):
self.entity(tokens) self.entity(tokens)
return tokens return tokens
def batch(self, texts, tag=True, parse=True, entity=True): def pipe(self, texts, tag=True, parse=True, entity=True, n_threads=2,
if tag is False: batch_size=1000):
return [self(text, tag=tag, parse=parse, entity=entity) stream = self.tokenizer.stream(texts,
for text in texts] n_threads=n_threads, batch_size=batch_size)
docs = [] if self.tagger and tag:
for text in texts: stream = self.tagger.stream(stream,
docs.append(self(text, tag=True, parse=False, entity=entity)) n_threads=n_threads, batch_size=batch_size)
if self.matcher and entity:
stream = self.matcher.stream(stream,
n_threads=n_threads, batch_size=batch_size)
if self.parser and parse: if self.parser and parse:
self.parser.parse_batch(docs) stream = self.parser.stream(stream,
return docs n_threads=n_threads, batch_size=batch_size)
if self.entity and entity:
stream = self.entity.stream(stream,
n_threads=n_threads, batch_size=batch_size)
for doc in stream:
yield doc
def end_training(self, data_dir=None): def end_training(self, data_dir=None):
if data_dir is None: if data_dir is None:

View File

@ -250,6 +250,10 @@ cdef class Matcher:
doc.ents = [(e.label, e.start, e.end) for e in doc.ents] + filtered doc.ents = [(e.label, e.start, e.end) for e in doc.ents] + filtered
return matches return matches
def pipe(self, texts, batch_size=1000, n_threads=2):
for text in texts:
yield self(text)
cdef class PhraseMatcher: cdef class PhraseMatcher:
cdef Pool mem cdef Pool mem
@ -303,6 +307,11 @@ cdef class PhraseMatcher:
doc.merge(*match) doc.merge(*match)
return matches return matches
def pipe(self, stream, batch_size=1000, n_threads=2):
for doc in stream:
self(doc)
yield doc
def accept_match(self, Doc doc, int label, int start, int end): def accept_match(self, Doc doc, int label, int start, int end):
assert (end - start) < self.max_length assert (end - start) < self.max_length
cdef int i, j cdef int i, j

View File

@ -114,26 +114,33 @@ cdef class Parser:
# Check for KeyboardInterrupt etc. Untested # Check for KeyboardInterrupt etc. Untested
PyErr_CheckSignals() PyErr_CheckSignals()
def parse_batch(self, batch): def pipe(self, stream, int batch_size=1000, int n_threads=2):
cdef TokenC** doc_ptr = <TokenC**>calloc(len(batch), sizeof(TokenC*)) cdef Pool mem = Pool()
cdef int* lengths = <int*>calloc(len(batch), sizeof(int)) cdef TokenC** doc_ptr = <TokenC**>mem.alloc(batch_size, sizeof(TokenC*))
cdef int* lengths = <int*>mem.alloc(batch_size, sizeof(int))
cdef Doc doc cdef Doc doc
cdef int i cdef int i
for i, doc in enumerate(batch):
doc_ptr[i] = doc.c
lengths[i] = doc.length
cdef int nr_class = self.moves.n_moves cdef int nr_class = self.moves.n_moves
cdef int nr_feat = self.model.nr_feat cdef int nr_feat = self.model.nr_feat
cdef int nr_doc = len(batch) queue = []
with nogil: for doc in stream:
for i in range(nr_doc): queue.append(doc)
self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class) doc_ptr[len(queue)] = doc.c
for doc in batch: lengths[len(queue)] = doc.length
doc.is_parsed = True if len(queue) == batch_size:
# Check for KeyboardInterrupt etc. Untested for i in cython.parallel.prange(batch_size, nogil=True,
num_threads=n_threads):
self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
PyErr_CheckSignals()
for doc in queue:
yield doc
queue = []
batch_size = len(queue)
for i in range(batch_size):
self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
for doc in queue:
yield doc
PyErr_CheckSignals() PyErr_CheckSignals()
free(doc_ptr)
free(lengths)
cdef void parseC(self, TokenC* tokens, int length, int nr_feat, int nr_class) nogil: cdef void parseC(self, TokenC* tokens, int length, int nr_feat, int nr_class) nogil:
cdef ExampleC eg cdef ExampleC eg

View File

@ -212,6 +212,11 @@ cdef class Tagger:
eg.reset_classes(eg.c.nr_class) eg.reset_classes(eg.c.nr_class)
tokens.is_tagged = True tokens.is_tagged = True
tokens._py_tokens = [None] * tokens.length tokens._py_tokens = [None] * tokens.length
def pipe(self, stream, batch_size=1000, n_threads=2):
for doc in stream:
self(doc)
yield doc
def train(self, Doc tokens, object gold_tag_strs): def train(self, Doc tokens, object gold_tag_strs):
assert len(tokens) == len(gold_tag_strs) assert len(tokens) == len(gold_tag_strs)