Update benchmarker to be more consistent and flexible

This commit is contained in:
Andrew Godwin 2016-03-20 18:34:39 -07:00
parent 5c1a0fc096
commit 7e37440c93

View File

@ -12,9 +12,6 @@ stats = {}
class MyClientProtocol(WebSocketClientProtocol): class MyClientProtocol(WebSocketClientProtocol):
num_messages = 5
message_gap = 1
def onConnect(self, response): def onConnect(self, response):
self.opened = time.time() self.opened = time.time()
self.sent = 0 self.sent = 0
@ -29,26 +26,34 @@ class MyClientProtocol(WebSocketClientProtocol):
def onOpen(self): def onOpen(self):
def hello(): def hello():
if self.last_send is None: if self.last_send is None:
if self.sent >= self.num_messages: if self.sent >= self.factory.num_messages:
self.sendClose() self.sendClose()
return return
self.sendMessage(("%s:%s" % (self.sent, self.fingerprint)).encode("ascii"))
self.last_send = time.time() self.last_send = time.time()
self.sendMessage(("%s:%s" % (self.sent, self.fingerprint)).encode("ascii"))
self.sent += 1 self.sent += 1
else: else:
# Wait for receipt of ping # Wait for receipt of ping
pass pass
self.factory.reactor.callLater(1, hello) self.factory.reactor.callLater(1.0 / self.factory.message_rate, hello)
hello() hello()
def onMessage(self, payload, isBinary): def onMessage(self, payload, isBinary):
# Detect receive-before-send
if self.last_send is None:
self.corrupted += 1
print("CRITICAL: Socket %s received before sending: %s" % (self.fingerprint, payload))
return
num, fingerprint = payload.decode("ascii").split(":") num, fingerprint = payload.decode("ascii").split(":")
if fingerprint != self.fingerprint: if fingerprint != self.fingerprint:
self.corrupted += 1 self.corrupted += 1
if int(num) != self.received: try:
self.out_of_order += 1 if int(num) != self.received:
self.received += 1 self.out_of_order += 1
except ValueError:
self.corrupted += 1
self.latencies.append(time.time() - self.last_send) self.latencies.append(time.time() - self.last_send)
self.received += 1
self.last_send = None self.last_send = None
def onClose(self, wasClean, code, reason): def onClose(self, wasClean, code, reason):
@ -78,27 +83,43 @@ class Benchmarker(object):
Performs benchmarks against WebSockets. Performs benchmarks against WebSockets.
""" """
def __init__(self, url, num, rate): def __init__(self, url, num, concurrency, rate, messages):
self.url = url self.url = url
self.num = num self.num = num
self.concurrency = concurrency
self.rate = rate self.rate = rate
self.messages = messages
self.factory = WebSocketClientFactory( self.factory = WebSocketClientFactory(
args.url, args.url,
) )
self.factory.protocol = MyClientProtocol self.factory.protocol = MyClientProtocol
self.factory.num_messages = self.messages
self.factory.message_rate = self.rate
def loop(self): def loop(self):
self.spawn_loop()
self.progress_loop()
def spawn_loop(self):
self.spawn_connections() self.spawn_connections()
reactor.callLater(0.01, self.spawn_loop)
def progress_loop(self):
self.print_progress() self.print_progress()
reactor.callLater(1, self.loop) reactor.callLater(1, self.progress_loop)
def spawn_connections(self): def spawn_connections(self):
if len(stats) >= self.num: # Stop spawning if we did the right total number
max_to_spawn = self.num - len(stats)
if max_to_spawn <= 0:
return return
# Decode connection args
host, port = self.url.split("://")[1].split(":") host, port = self.url.split("://")[1].split(":")
port = int(port) port = int(port)
for i in range(self.rate): # Only spawn enough to get up to concurrency
# TODO: Look at URL open_protocols = len([x for x in stats.values() if not x])
to_spawn = min(max(self.concurrency - open_protocols, 0), max_to_spawn)
for _ in range(to_spawn):
reactor.connectTCP(host, port, self.factory) reactor.connectTCP(host, port, self.factory)
def print_progress(self): def print_progress(self):
@ -176,14 +197,18 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("url") parser.add_argument("url")
parser.add_argument("-n", "--num", type=int, default=100) parser.add_argument("-n", "--num", type=int, default=100, help="Total number of sockets to open")
parser.add_argument("-r", "--rate", type=int, default=10) parser.add_argument("-c", "--concurrency", type=int, default=10, help="Number of sockets to open at once")
parser.add_argument("-r", "--rate", type=float, default=1, help="Number of messages to send per socket per second")
parser.add_argument("-m", "--messages", type=int, default=5, help="Number of messages to send per socket before close")
args = parser.parse_args() args = parser.parse_args()
benchmarker = Benchmarker( benchmarker = Benchmarker(
url=args.url, url=args.url,
num=args.num, num=args.num,
concurrency=args.concurrency,
rate=args.rate, rate=args.rate,
messages=args.messages,
) )
benchmarker.loop() benchmarker.loop()
reactor.run() reactor.run()