From 84b247ef8361ec5315bbc02b5b95b18b87dbeaa2 Mon Sep 17 00:00:00 2001 From: Matthew Honnibal Date: Wed, 3 Feb 2016 02:04:55 +0100 Subject: [PATCH] * Add a .pipe method, that takes a stream of input, operates on it, and streams the output. Internally, the stream may be buffered, to allow multi-threading. --- spacy/language.py | 26 +++++++++++++++++--------- spacy/matcher.pyx | 9 +++++++++ spacy/syntax/parser.pyx | 37 ++++++++++++++++++++++--------------- spacy/tagger.pyx | 5 +++++ 4 files changed, 53 insertions(+), 24 deletions(-) diff --git a/spacy/language.py b/spacy/language.py index bf055535d..58d137e2e 100644 --- a/spacy/language.py +++ b/spacy/language.py @@ -269,16 +269,24 @@ class Language(object): self.entity(tokens) return tokens - def batch(self, texts, tag=True, parse=True, entity=True): - if tag is False: - return [self(text, tag=tag, parse=parse, entity=entity) - for text in texts] - docs = [] - for text in texts: - docs.append(self(text, tag=True, parse=False, entity=entity)) + def pipe(self, texts, tag=True, parse=True, entity=True, n_threads=2, + batch_size=1000): + stream = self.tokenizer.stream(texts, + n_threads=n_threads, batch_size=batch_size) + if self.tagger and tag: + stream = self.tagger.stream(stream, + n_threads=n_threads, batch_size=batch_size) + if self.matcher and entity: + stream = self.matcher.stream(stream, + n_threads=n_threads, batch_size=batch_size) if self.parser and parse: - self.parser.parse_batch(docs) - return docs + stream = self.parser.stream(stream, + n_threads=n_threads, batch_size=batch_size) + if self.entity and entity: + stream = self.entity.stream(stream, + n_threads=n_threads, batch_size=batch_size) + for doc in stream: + yield doc def end_training(self, data_dir=None): if data_dir is None: diff --git a/spacy/matcher.pyx b/spacy/matcher.pyx index cef98c068..8543be1f1 100644 --- a/spacy/matcher.pyx +++ b/spacy/matcher.pyx @@ -250,6 +250,10 @@ cdef class Matcher: doc.ents = [(e.label, e.start, e.end) for e in doc.ents] + filtered return matches + def pipe(self, texts, batch_size=1000, n_threads=2): + for text in texts: + yield self(text) + cdef class PhraseMatcher: cdef Pool mem @@ -303,6 +307,11 @@ cdef class PhraseMatcher: doc.merge(*match) return matches + def pipe(self, stream, batch_size=1000, n_threads=2): + for doc in stream: + self(doc) + yield doc + def accept_match(self, Doc doc, int label, int start, int end): assert (end - start) < self.max_length cdef int i, j diff --git a/spacy/syntax/parser.pyx b/spacy/syntax/parser.pyx index 98b81d316..d84a26330 100644 --- a/spacy/syntax/parser.pyx +++ b/spacy/syntax/parser.pyx @@ -114,26 +114,33 @@ cdef class Parser: # Check for KeyboardInterrupt etc. Untested PyErr_CheckSignals() - def parse_batch(self, batch): - cdef TokenC** doc_ptr = calloc(len(batch), sizeof(TokenC*)) - cdef int* lengths = calloc(len(batch), sizeof(int)) + def pipe(self, stream, int batch_size=1000, int n_threads=2): + cdef Pool mem = Pool() + cdef TokenC** doc_ptr = mem.alloc(batch_size, sizeof(TokenC*)) + cdef int* lengths = mem.alloc(batch_size, sizeof(int)) cdef Doc doc cdef int i - for i, doc in enumerate(batch): - doc_ptr[i] = doc.c - lengths[i] = doc.length cdef int nr_class = self.moves.n_moves cdef int nr_feat = self.model.nr_feat - cdef int nr_doc = len(batch) - with nogil: - for i in range(nr_doc): - self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class) - for doc in batch: - doc.is_parsed = True - # Check for KeyboardInterrupt etc. Untested + queue = [] + for doc in stream: + queue.append(doc) + doc_ptr[len(queue)] = doc.c + lengths[len(queue)] = doc.length + if len(queue) == batch_size: + for i in cython.parallel.prange(batch_size, nogil=True, + num_threads=n_threads): + self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class) + PyErr_CheckSignals() + for doc in queue: + yield doc + queue = [] + batch_size = len(queue) + for i in range(batch_size): + self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class) + for doc in queue: + yield doc PyErr_CheckSignals() - free(doc_ptr) - free(lengths) cdef void parseC(self, TokenC* tokens, int length, int nr_feat, int nr_class) nogil: cdef ExampleC eg diff --git a/spacy/tagger.pyx b/spacy/tagger.pyx index 956b8fff1..c60140296 100644 --- a/spacy/tagger.pyx +++ b/spacy/tagger.pyx @@ -212,6 +212,11 @@ cdef class Tagger: eg.reset_classes(eg.c.nr_class) tokens.is_tagged = True tokens._py_tokens = [None] * tokens.length + + def pipe(self, stream, batch_size=1000, n_threads=2): + for doc in stream: + self(doc) + yield doc def train(self, Doc tokens, object gold_tag_strs): assert len(tokens) == len(gold_tag_strs)