From 7e37440c934ee3abf5271faec61bd085fa965e26 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 20 Mar 2016 18:34:39 -0700 Subject: [PATCH] Update benchmarker to be more consistent and flexible --- testproject/benchmark.py | 57 +++++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/testproject/benchmark.py b/testproject/benchmark.py index febb08b..0885d4d 100644 --- a/testproject/benchmark.py +++ b/testproject/benchmark.py @@ -12,9 +12,6 @@ stats = {} class MyClientProtocol(WebSocketClientProtocol): - num_messages = 5 - message_gap = 1 - def onConnect(self, response): self.opened = time.time() self.sent = 0 @@ -29,26 +26,34 @@ class MyClientProtocol(WebSocketClientProtocol): def onOpen(self): def hello(): if self.last_send is None: - if self.sent >= self.num_messages: + if self.sent >= self.factory.num_messages: self.sendClose() return - self.sendMessage(("%s:%s" % (self.sent, self.fingerprint)).encode("ascii")) self.last_send = time.time() + self.sendMessage(("%s:%s" % (self.sent, self.fingerprint)).encode("ascii")) self.sent += 1 else: # Wait for receipt of ping pass - self.factory.reactor.callLater(1, hello) + self.factory.reactor.callLater(1.0 / self.factory.message_rate, hello) hello() def onMessage(self, payload, isBinary): + # Detect receive-before-send + if self.last_send is None: + self.corrupted += 1 + print("CRITICAL: Socket %s received before sending: %s" % (self.fingerprint, payload)) + return num, fingerprint = payload.decode("ascii").split(":") if fingerprint != self.fingerprint: self.corrupted += 1 - if int(num) != self.received: - self.out_of_order += 1 - self.received += 1 + try: + if int(num) != self.received: + self.out_of_order += 1 + except ValueError: + self.corrupted += 1 self.latencies.append(time.time() - self.last_send) + self.received += 1 self.last_send = None def onClose(self, wasClean, code, reason): @@ -78,27 +83,43 @@ class Benchmarker(object): Performs benchmarks against WebSockets. """ - def __init__(self, url, num, rate): + def __init__(self, url, num, concurrency, rate, messages): self.url = url self.num = num + self.concurrency = concurrency self.rate = rate + self.messages = messages self.factory = WebSocketClientFactory( args.url, ) self.factory.protocol = MyClientProtocol + self.factory.num_messages = self.messages + self.factory.message_rate = self.rate def loop(self): + self.spawn_loop() + self.progress_loop() + + def spawn_loop(self): self.spawn_connections() + reactor.callLater(0.01, self.spawn_loop) + + def progress_loop(self): self.print_progress() - reactor.callLater(1, self.loop) + reactor.callLater(1, self.progress_loop) def spawn_connections(self): - if len(stats) >= self.num: + # Stop spawning if we did the right total number + max_to_spawn = self.num - len(stats) + if max_to_spawn <= 0: return + # Decode connection args host, port = self.url.split("://")[1].split(":") port = int(port) - for i in range(self.rate): - # TODO: Look at URL + # Only spawn enough to get up to concurrency + open_protocols = len([x for x in stats.values() if not x]) + to_spawn = min(max(self.concurrency - open_protocols, 0), max_to_spawn) + for _ in range(to_spawn): reactor.connectTCP(host, port, self.factory) def print_progress(self): @@ -176,14 +197,18 @@ if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("url") - parser.add_argument("-n", "--num", type=int, default=100) - parser.add_argument("-r", "--rate", type=int, default=10) + parser.add_argument("-n", "--num", type=int, default=100, help="Total number of sockets to open") + parser.add_argument("-c", "--concurrency", type=int, default=10, help="Number of sockets to open at once") + parser.add_argument("-r", "--rate", type=float, default=1, help="Number of messages to send per socket per second") + parser.add_argument("-m", "--messages", type=int, default=5, help="Number of messages to send per socket before close") args = parser.parse_args() benchmarker = Benchmarker( url=args.url, num=args.num, + concurrency=args.concurrency, rate=args.rate, + messages=args.messages, ) benchmarker.loop() reactor.run()