From 88d8494b5a2dd46fed754de948b53b6a0307ae23 Mon Sep 17 00:00:00 2001 From: Miroslav Stampar Date: Tue, 18 Dec 2012 16:03:35 +0100 Subject: [PATCH] Implementation for an Issue #307 --- lib/core/bigarray.py | 4 ++++ lib/techniques/error/use.py | 13 ++++++++++++- lib/techniques/union/use.py | 29 ++++++++++++++++++++++++----- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/lib/core/bigarray.py b/lib/core/bigarray.py index 754fae89d..fe7a9225a 100644 --- a/lib/core/bigarray.py +++ b/lib/core/bigarray.py @@ -40,6 +40,10 @@ class BigArray(list): self.chunks[-1] = filename self.chunks.append([]) + def extend(self, value): + for _ in value: + self.append(_) + def pop(self): if len(self.chunks[-1]) < 1: self.chunks.pop() diff --git a/lib/techniques/error/use.py b/lib/techniques/error/use.py index a964c47ec..081ec8b25 100644 --- a/lib/techniques/error/use.py +++ b/lib/techniques/error/use.py @@ -357,6 +357,8 @@ def errorUse(expression, dump=False): threadData.shared.limits = iter(xrange(startLimit, stopLimit)) numThreads = min(conf.threads, (stopLimit - startLimit)) threadData.shared.outputs = BigArray() + threadData.shared.buffered = [] + threadData.shared.lastFlushed = startLimit - 1 if kb.dumpTable and (len(expressionFieldsList) < (stopLimit - startLimit) > CHECK_ZERO_COLUMNS_THRESHOLD): for field in expressionFieldsList: @@ -392,7 +394,15 @@ def errorUse(expression, dump=False): output = output[0] with kb.locks.outputs: - threadData.shared.outputs.append(output) + index = None + for index in xrange(len(threadData.shared.buffered)): + if threadData.shared.buffered[index][0] >= num: + break + threadData.shared.buffered.insert(index or 0, (num, output)) + while threadData.shared.buffered and threadData.shared.lastFlushed + 1 == threadData.shared.buffered[0][0]: + threadData.shared.lastFlushed += 1 + threadData.shared.outputs.append(threadData.shared.buffered[0][1]) + del threadData.shared.buffered[0] runThreads(numThreads, errorThread) @@ -403,6 +413,7 @@ def errorUse(expression, dump=False): logger.warn(warnMsg) finally: + threadData.shared.outputs.extend(_[1] for _ in sorted(threadData.shared.buffered)) outputs = threadData.shared.outputs kb.suppressResumeInfo = False diff --git a/lib/techniques/union/use.py b/lib/techniques/union/use.py index 4726a87d1..fcd25623e 100644 --- a/lib/techniques/union/use.py +++ b/lib/techniques/union/use.py @@ -274,6 +274,8 @@ def unionUse(expression, unpack=True, dump=False): threadData.shared.limits = iter(xrange(startLimit, stopLimit)) numThreads = min(conf.threads, (stopLimit - startLimit)) threadData.shared.value = BigArray() + threadData.shared.buffered = [] + threadData.shared.lastFlushed = startLimit - 1 if stopLimit > TURN_OFF_RESUME_INFO_LIMIT: kb.suppressResumeInfo = True @@ -306,14 +308,28 @@ def unionUse(expression, unpack=True, dump=False): break if output: - if all(map(lambda x: x in output, [kb.chars.start, kb.chars.stop])): + if all(map(lambda _: _ in output, (kb.chars.start, kb.chars.stop))): items = parseUnionPage(output) - if isNoneValue(items): - continue + with kb.locks.value: - for item in arrayizeValue(items): - threadData.shared.value.append(item) + index = None + for index in xrange(len(threadData.shared.buffered)): + if threadData.shared.buffered[index][0] >= num: + break + threadData.shared.buffered.insert(index or 0, (num, items)) + while threadData.shared.buffered and threadData.shared.lastFlushed + 1 == threadData.shared.buffered[0][0]: + threadData.shared.lastFlushed += 1 + _ = threadData.shared.buffered[0][1] + if not isNoneValue(_): + threadData.shared.value.extend(arrayizeValue(_)) + del threadData.shared.buffered[0] else: + with kb.locks.value: + index = None + for index in xrange(len(threadData.shared.buffered)): + if threadData.shared.buffered[index][0] >= num: + break + threadData.shared.buffered.insert(index or 0, (num, None)) items = output.replace(kb.chars.start, "").replace(kb.chars.stop, "").split(kb.chars.delimiter) if conf.verbose == 1 and not (threadData.resumed and kb.suppressResumeInfo): @@ -337,6 +353,9 @@ def unionUse(expression, unpack=True, dump=False): logger.warn(warnMsg) finally: + for _ in sorted(threadData.shared.buffered): + if not isNoneValue(_[1]): + threadData.shared.value.extend(arrayizeValue(_[1])) value = threadData.shared.value kb.suppressResumeInfo = False