From feea84f323c1d55ed5f41a986fed74fa236aa346 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 5 May 2016 22:48:12 -0700 Subject: [PATCH] Introduce backpressure with ChannelFull --- channels/database_layer.py | 6 ++++++ channels/handler.py | 13 ++++++++++++- channels/worker.py | 7 ++++++- docs/asgi.rst | 35 ++++++++++++++++++++++++++++++----- setup.py | 2 +- 5 files changed, 55 insertions(+), 8 deletions(-) diff --git a/channels/database_layer.py b/channels/database_layer.py index 37894a8..88be4a3 100644 --- a/channels/database_layer.py +++ b/channels/database_layer.py @@ -36,6 +36,12 @@ class DatabaseChannelLayer(object): extensions = ["groups", "flush"] + class MessageTooLarge(Exception): + pass + + class ChannelFull(Exception): + pass + def send(self, channel, message): # Typecheck assert isinstance(message, dict), "message is not a dict" diff --git a/channels/handler.py b/channels/handler.py index 43fd653..ec03f4c 100644 --- a/channels/handler.py +++ b/channels/handler.py @@ -328,4 +328,15 @@ class ViewConsumer(object): def __call__(self, message): for reply_message in self.handler(message): - message.reply_channel.send(reply_message) + while True: + # If we get ChannelFull we just wait and keep trying until + # it goes through. + # TODO: Add optional death timeout? Don't want to lock up + # a whole worker if the client just vanishes and leaves the response + # channel full. + try: + message.reply_channel.send(reply_message) + except message.channel_layer.ChannelFull: + time.sleep(0.05) + else: + break diff --git a/channels/worker.py b/channels/worker.py index 77c1dcb..6e65492 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -116,6 +116,11 @@ class Worker(object): repr(content)[:100], ) continue - self.channel_layer.send(channel, content) + # Try to re-insert it a few times then drop it + for _ in range(10): + try: + self.channel_layer.send(channel, content) + except self.channel_layer.ChannelFull: + time.sleep(0.05) except: logger.exception("Error processing message with consumer %s:", name_that_thing(consumer)) diff --git a/docs/asgi.rst b/docs/asgi.rst index b31136a..fb3323c 100644 --- a/docs/asgi.rst +++ b/docs/asgi.rst @@ -13,7 +13,7 @@ servers (particularly webservers) and Python applications, intended to allow handling of multiple common protocol styles (including HTTP, HTTP2, and WebSocket). -It is intended to replace and expand on WSGI, though the design +It is intended to supplement and expand on WSGI, though the design deliberately includes provisions to allow WSGI-to-ASGI and ASGI-to-WGSI adapters to be easily written for the HTTP protocol. @@ -34,7 +34,7 @@ and from different application threads or processes. It also lays out new, serialization-compatible formats for things like HTTP requests and responses and WebSocket data frames, to allow these to -be transported over a network or local socket, and allow separation +be transported over a network or local memory, and allow separation of protocol handling and application logic into different processes. Part of this design is ensuring there is an easy path to use both @@ -255,10 +255,30 @@ problem. If ordering of incoming packets matters for a protocol, they should be annotated with a packet number (as WebSocket is in this specification). Single-reader channels, such as those used for response channels back to -clients, are not subject to this problem; a single reader should always +clients, are not subject to this problem; a single reader must always receive messages in channel order. +Capacity +-------- + +To provide backpressure, each channel in a channel layer may have a capacity, +defined however the layer wishes (it is recommended that it is configurable +by the user using keyword arguments to the channel layer constructor, and +furthermore configurable per channel name or name prefix). + +When a channel is at or over capacity, trying to send() to that channel +may raise ChannelFull, which indicates to the sender the channel is over +capacity. How the sender wishes to deal with this will depend on context; +for example, a web application trying to send a response body will likely +wait until it empties out again, while a HTTP interface server trying to +send in a request would drop the request and return a 503 error. + +Sending to a group never raises ChannelFull; instead, it must silently drop +the message if it is over capacity, as per ASGI's at-most-once delivery +policy. + + Specification Details ===================== @@ -291,6 +311,9 @@ A *channel layer* must provide an object with these attributes * ``MessageTooLarge``, the exception raised when a send operation fails because the encoded message is over the layer's size limit. +* ``ChannelFull``, the exception raised when a send operation fails + because the destination channel is over capacity. + * ``extensions``, a list of unicode string names indicating which extensions this layer provides, or empty if it supports none. The names defined in this document are ``groups``, ``flush`` and @@ -307,7 +330,8 @@ A channel layer implementing the ``groups`` extension must also provide: * ``send_group(group, message)``, a callable that takes two positional arguments; the group to send to, as a unicode string, and the message - to send, as a serializable ``dict``. + to send, as a serializable ``dict``. It may raise MessageTooLarge but cannot + raise ChannelFull. * ``group_expiry``, an integer number of seconds that specifies how long group membership is valid for after the most recent ``group_add`` call (see @@ -346,7 +370,8 @@ Channels **must**: * Never deliver a message more than once. -* Never block on message send. +* Never block on message send (though they may raise ChannelFull or + MessageTooLarge) * Be able to handle messages of at least 1MB in size when encoded as JSON (the implementation may use better encoding or compression, as long diff --git a/setup.py b/setup.py index 59bf911..5664ffd 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( include_package_data=True, install_requires=[ 'Django>=1.8', - 'asgiref>=0.10', + 'asgiref>=0.12', 'daphne>=0.11', ] )