Introduce backpressure with ChannelFull

This commit is contained in:
Andrew Godwin 2016-05-05 22:48:12 -07:00
parent 7b75761644
commit feea84f323
5 changed files with 55 additions and 8 deletions

View File

@ -36,6 +36,12 @@ class DatabaseChannelLayer(object):
extensions = ["groups", "flush"]
class MessageTooLarge(Exception):
pass
class ChannelFull(Exception):
pass
def send(self, channel, message):
# Typecheck
assert isinstance(message, dict), "message is not a dict"

View File

@ -328,4 +328,15 @@ class ViewConsumer(object):
def __call__(self, message):
for reply_message in self.handler(message):
message.reply_channel.send(reply_message)
while True:
# If we get ChannelFull we just wait and keep trying until
# it goes through.
# TODO: Add optional death timeout? Don't want to lock up
# a whole worker if the client just vanishes and leaves the response
# channel full.
try:
message.reply_channel.send(reply_message)
except message.channel_layer.ChannelFull:
time.sleep(0.05)
else:
break

View File

@ -116,6 +116,11 @@ class Worker(object):
repr(content)[:100],
)
continue
self.channel_layer.send(channel, content)
# Try to re-insert it a few times then drop it
for _ in range(10):
try:
self.channel_layer.send(channel, content)
except self.channel_layer.ChannelFull:
time.sleep(0.05)
except:
logger.exception("Error processing message with consumer %s:", name_that_thing(consumer))

View File

@ -13,7 +13,7 @@ servers (particularly webservers) and Python applications, intended
to allow handling of multiple common protocol styles (including HTTP, HTTP2,
and WebSocket).
It is intended to replace and expand on WSGI, though the design
It is intended to supplement and expand on WSGI, though the design
deliberately includes provisions to allow WSGI-to-ASGI and ASGI-to-WGSI
adapters to be easily written for the HTTP protocol.
@ -34,7 +34,7 @@ and from different application threads or processes.
It also lays out new, serialization-compatible formats for things like
HTTP requests and responses and WebSocket data frames, to allow these to
be transported over a network or local socket, and allow separation
be transported over a network or local memory, and allow separation
of protocol handling and application logic into different processes.
Part of this design is ensuring there is an easy path to use both
@ -255,10 +255,30 @@ problem. If ordering of incoming packets matters for a protocol, they should
be annotated with a packet number (as WebSocket is in this specification).
Single-reader channels, such as those used for response channels back to
clients, are not subject to this problem; a single reader should always
clients, are not subject to this problem; a single reader must always
receive messages in channel order.
Capacity
--------
To provide backpressure, each channel in a channel layer may have a capacity,
defined however the layer wishes (it is recommended that it is configurable
by the user using keyword arguments to the channel layer constructor, and
furthermore configurable per channel name or name prefix).
When a channel is at or over capacity, trying to send() to that channel
may raise ChannelFull, which indicates to the sender the channel is over
capacity. How the sender wishes to deal with this will depend on context;
for example, a web application trying to send a response body will likely
wait until it empties out again, while a HTTP interface server trying to
send in a request would drop the request and return a 503 error.
Sending to a group never raises ChannelFull; instead, it must silently drop
the message if it is over capacity, as per ASGI's at-most-once delivery
policy.
Specification Details
=====================
@ -291,6 +311,9 @@ A *channel layer* must provide an object with these attributes
* ``MessageTooLarge``, the exception raised when a send operation fails
because the encoded message is over the layer's size limit.
* ``ChannelFull``, the exception raised when a send operation fails
because the destination channel is over capacity.
* ``extensions``, a list of unicode string names indicating which
extensions this layer provides, or empty if it supports none.
The names defined in this document are ``groups``, ``flush`` and
@ -307,7 +330,8 @@ A channel layer implementing the ``groups`` extension must also provide:
* ``send_group(group, message)``, a callable that takes two positional
arguments; the group to send to, as a unicode string, and the message
to send, as a serializable ``dict``.
to send, as a serializable ``dict``. It may raise MessageTooLarge but cannot
raise ChannelFull.
* ``group_expiry``, an integer number of seconds that specifies how long group
membership is valid for after the most recent ``group_add`` call (see
@ -346,7 +370,8 @@ Channels **must**:
* Never deliver a message more than once.
* Never block on message send.
* Never block on message send (though they may raise ChannelFull or
MessageTooLarge)
* Be able to handle messages of at least 1MB in size when encoded as
JSON (the implementation may use better encoding or compression, as long

View File

@ -13,7 +13,7 @@ setup(
include_package_data=True,
install_requires=[
'Django>=1.8',
'asgiref>=0.10',
'asgiref>=0.12',
'daphne>=0.11',
]
)