Take note of backpressure

This commit is contained in:
Andrew Godwin 2016-05-07 13:00:09 -07:00
parent 905b71a745
commit 2d777e75f9
2 changed files with 55 additions and 36 deletions

View File

@ -111,19 +111,23 @@ class WebRequest(http.Request):
self.client_addr = None self.client_addr = None
self.server_addr = None self.server_addr = None
# Send message # Send message
self.factory.channel_layer.send("http.request", { try:
"reply_channel": self.reply_channel, self.factory.channel_layer.send("http.request", {
# TODO: Correctly say if it's 1.1 or 1.0 "reply_channel": self.reply_channel,
"http_version": "1.1", # TODO: Correctly say if it's 1.1 or 1.0
"method": self.method.decode("ascii"), "http_version": "1.1",
"path": self.unquote(self.path), "method": self.method.decode("ascii"),
"scheme": "http", "path": self.unquote(self.path),
"query_string": self.unquote(self.query_string), "scheme": "http",
"headers": self.clean_headers, "query_string": self.unquote(self.query_string),
"body": self.content.read(), "headers": self.clean_headers,
"client": self.client_addr, "body": self.content.read(),
"server": self.server_addr, "client": self.client_addr,
}) "server": self.server_addr,
})
except self.factory.channel_layer.ChannelFull:
# Channel is too full; reject request with 503
self.basic_error(503, b"Service Unavailable", "Request queue full.")
@classmethod @classmethod
def unquote(cls, value): def unquote(cls, value):
@ -140,9 +144,12 @@ class WebRequest(http.Request):
Sends a disconnect message on the http.disconnect channel. Sends a disconnect message on the http.disconnect channel.
Useful only really for long-polling. Useful only really for long-polling.
""" """
self.factory.channel_layer.send("http.disconnect", { try:
"reply_channel": self.reply_channel, self.factory.channel_layer.send("http.disconnect", {
}) "reply_channel": self.reply_channel,
})
except self.factory.channel_layer.ChannelFull:
pass
def connectionLost(self, reason): def connectionLost(self, reason):
""" """

View File

@ -82,7 +82,11 @@ class WebSocketProtocol(WebSocketServerProtocol):
def onOpen(self): def onOpen(self):
# Send news that this channel is open # Send news that this channel is open
logger.debug("WebSocket open for %s", self.reply_channel) logger.debug("WebSocket open for %s", self.reply_channel)
self.channel_layer.send("websocket.connect", self.request_info) try:
self.channel_layer.send("websocket.connect", self.request_info)
except self.channel_layer.ChannelFull:
# We don't drop the connection here as you don't _have_ to consume websocket.connect
pass
self.factory.log_action("websocket", "connected", { self.factory.log_action("websocket", "connected", {
"path": self.request.path, "path": self.request.path,
"client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None, "client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,
@ -92,20 +96,25 @@ class WebSocketProtocol(WebSocketServerProtocol):
logger.debug("WebSocket incoming packet on %s", self.reply_channel) logger.debug("WebSocket incoming packet on %s", self.reply_channel)
self.packets_received += 1 self.packets_received += 1
self.last_data = time.time() self.last_data = time.time()
if isBinary: try:
self.channel_layer.send("websocket.receive", { if isBinary:
"reply_channel": self.reply_channel, self.channel_layer.send("websocket.receive", {
"path": self.unquote(self.path), "reply_channel": self.reply_channel,
"order": self.packets_received, "path": self.unquote(self.path),
"bytes": payload, "order": self.packets_received,
}) "bytes": payload,
else: })
self.channel_layer.send("websocket.receive", { else:
"reply_channel": self.reply_channel, self.channel_layer.send("websocket.receive", {
"path": self.unquote(self.path), "reply_channel": self.reply_channel,
"order": self.packets_received, "path": self.unquote(self.path),
"text": payload.decode("utf8"), "order": self.packets_received,
}) "text": payload.decode("utf8"),
})
except self.channel_layer.ChannelFull:
# We don't drop the connection here as you don't _have_ to consume websocket.receive
# TODO: Maybe add an option to drop if this is backlogged?
pass
def serverSend(self, content, binary=False): def serverSend(self, content, binary=False):
""" """
@ -128,11 +137,14 @@ class WebSocketProtocol(WebSocketServerProtocol):
if hasattr(self, "reply_channel"): if hasattr(self, "reply_channel"):
logger.debug("WebSocket closed for %s", self.reply_channel) logger.debug("WebSocket closed for %s", self.reply_channel)
del self.factory.reply_protocols[self.reply_channel] del self.factory.reply_protocols[self.reply_channel]
self.channel_layer.send("websocket.disconnect", { try:
"reply_channel": self.reply_channel, self.channel_layer.send("websocket.disconnect", {
"path": self.unquote(self.path), "reply_channel": self.reply_channel,
"order": self.packets_received + 1, "path": self.unquote(self.path),
}) "order": self.packets_received + 1,
})
except self.channel_layer.ChannelFull:
pass
self.factory.log_action("websocket", "disconnected", { self.factory.log_action("websocket", "disconnected", {
"path": self.request.path, "path": self.request.path,
"client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None, "client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,