From 2d777e75f9a9958d3b73d377d692c67e9c8917c7 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sat, 7 May 2016 13:00:09 -0700 Subject: [PATCH] Take note of backpressure --- daphne/http_protocol.py | 39 ++++++++++++++++++------------- daphne/ws_protocol.py | 52 +++++++++++++++++++++++++---------------- 2 files changed, 55 insertions(+), 36 deletions(-) diff --git a/daphne/http_protocol.py b/daphne/http_protocol.py index 192456f..db18fb8 100755 --- a/daphne/http_protocol.py +++ b/daphne/http_protocol.py @@ -111,19 +111,23 @@ class WebRequest(http.Request): self.client_addr = None self.server_addr = None # Send message - self.factory.channel_layer.send("http.request", { - "reply_channel": self.reply_channel, - # TODO: Correctly say if it's 1.1 or 1.0 - "http_version": "1.1", - "method": self.method.decode("ascii"), - "path": self.unquote(self.path), - "scheme": "http", - "query_string": self.unquote(self.query_string), - "headers": self.clean_headers, - "body": self.content.read(), - "client": self.client_addr, - "server": self.server_addr, - }) + try: + self.factory.channel_layer.send("http.request", { + "reply_channel": self.reply_channel, + # TODO: Correctly say if it's 1.1 or 1.0 + "http_version": "1.1", + "method": self.method.decode("ascii"), + "path": self.unquote(self.path), + "scheme": "http", + "query_string": self.unquote(self.query_string), + "headers": self.clean_headers, + "body": self.content.read(), + "client": self.client_addr, + "server": self.server_addr, + }) + except self.factory.channel_layer.ChannelFull: + # Channel is too full; reject request with 503 + self.basic_error(503, b"Service Unavailable", "Request queue full.") @classmethod def unquote(cls, value): @@ -140,9 +144,12 @@ class WebRequest(http.Request): Sends a disconnect message on the http.disconnect channel. Useful only really for long-polling. """ - self.factory.channel_layer.send("http.disconnect", { - "reply_channel": self.reply_channel, - }) + try: + self.factory.channel_layer.send("http.disconnect", { + "reply_channel": self.reply_channel, + }) + except self.factory.channel_layer.ChannelFull: + pass def connectionLost(self, reason): """ diff --git a/daphne/ws_protocol.py b/daphne/ws_protocol.py index 5990bc7..a94a466 100755 --- a/daphne/ws_protocol.py +++ b/daphne/ws_protocol.py @@ -82,7 +82,11 @@ class WebSocketProtocol(WebSocketServerProtocol): def onOpen(self): # Send news that this channel is open logger.debug("WebSocket open for %s", self.reply_channel) - self.channel_layer.send("websocket.connect", self.request_info) + try: + self.channel_layer.send("websocket.connect", self.request_info) + except self.channel_layer.ChannelFull: + # We don't drop the connection here as you don't _have_ to consume websocket.connect + pass self.factory.log_action("websocket", "connected", { "path": self.request.path, "client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None, @@ -92,20 +96,25 @@ class WebSocketProtocol(WebSocketServerProtocol): logger.debug("WebSocket incoming packet on %s", self.reply_channel) self.packets_received += 1 self.last_data = time.time() - if isBinary: - self.channel_layer.send("websocket.receive", { - "reply_channel": self.reply_channel, - "path": self.unquote(self.path), - "order": self.packets_received, - "bytes": payload, - }) - else: - self.channel_layer.send("websocket.receive", { - "reply_channel": self.reply_channel, - "path": self.unquote(self.path), - "order": self.packets_received, - "text": payload.decode("utf8"), - }) + try: + if isBinary: + self.channel_layer.send("websocket.receive", { + "reply_channel": self.reply_channel, + "path": self.unquote(self.path), + "order": self.packets_received, + "bytes": payload, + }) + else: + self.channel_layer.send("websocket.receive", { + "reply_channel": self.reply_channel, + "path": self.unquote(self.path), + "order": self.packets_received, + "text": payload.decode("utf8"), + }) + except self.channel_layer.ChannelFull: + # We don't drop the connection here as you don't _have_ to consume websocket.receive + # TODO: Maybe add an option to drop if this is backlogged? + pass def serverSend(self, content, binary=False): """ @@ -128,11 +137,14 @@ class WebSocketProtocol(WebSocketServerProtocol): if hasattr(self, "reply_channel"): logger.debug("WebSocket closed for %s", self.reply_channel) del self.factory.reply_protocols[self.reply_channel] - self.channel_layer.send("websocket.disconnect", { - "reply_channel": self.reply_channel, - "path": self.unquote(self.path), - "order": self.packets_received + 1, - }) + try: + self.channel_layer.send("websocket.disconnect", { + "reply_channel": self.reply_channel, + "path": self.unquote(self.path), + "order": self.packets_received + 1, + }) + except self.channel_layer.ChannelFull: + pass self.factory.log_action("websocket", "disconnected", { "path": self.request.path, "client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,