Remove DatabaseLayer and improve deployment docs mentioning it

This commit is contained in:
Andrew Godwin 2016-05-07 10:35:12 -07:00
parent f346585f7c
commit dcbab8b2b4
8 changed files with 137 additions and 369 deletions

View File

@ -1,253 +0,0 @@
import base64
import datetime
import json
import random
import string
import time
from django.apps.registry import Apps
from django.db import DEFAULT_DB_ALIAS, connections, models, transaction
from django.db.utils import OperationalError
from django.utils import six
from django.utils.functional import cached_property
from django.utils.timezone import now
class DatabaseChannelLayer(object):
"""
ORM-backed ASGI channel layer.
For development use only; it will span multiple processes fine,
but it's going to be pretty bad at throughput. If you're reading this and
running it in production, PLEASE STOP.
Also uses JSON for serialization, as we don't want to make Django depend
on msgpack for the built-in backend. The JSON format uses \uffff as first
character to signify a b64 byte string rather than a text string. Ugly, but
it's not a valid Unicode character, so it should be safe enough.
"""
def __init__(self, db_alias=DEFAULT_DB_ALIAS, expiry=60, group_expiry=86400):
self.expiry = expiry
self.group_expiry = group_expiry
self.db_alias = db_alias
# ASGI API
extensions = ["groups", "flush"]
class MessageTooLarge(Exception):
pass
class ChannelFull(Exception):
pass
def send(self, channel, message):
# Typecheck
assert isinstance(message, dict), "message is not a dict"
assert isinstance(channel, six.text_type), "%s is not unicode" % channel
# Write message to messages table
self.channel_model.objects.create(
channel=channel,
content=self.serialize(message),
expiry=now() + datetime.timedelta(seconds=self.expiry)
)
def receive_many(self, channels, block=False):
if not channels:
return None, None
assert all(isinstance(channel, six.text_type) for channel in channels)
# Shuffle channels
channels = list(channels)
random.shuffle(channels)
# Clean out expired messages
self._clean_expired()
# Get a message from one of our channels
while True:
try:
with transaction.atomic():
message = self.channel_model.objects.select_for_update().filter(
channel__in=channels
).order_by("id").first()
if message:
self.channel_model.objects.filter(pk=message.pk).delete()
return message.channel, self.deserialize(message.content)
except OperationalError:
# The database is probably trying to prevent a deadlock
time.sleep(0.1)
continue
if block:
time.sleep(1)
else:
return None, None
def new_channel(self, pattern):
assert isinstance(pattern, six.text_type)
assert pattern.endswith("!")
# Keep making channel names till one isn't present.
while True:
random_string = "".join(random.choice(string.ascii_letters) for i in range(10))
new_name = pattern + random_string
if not self.channel_model.objects.filter(channel=new_name).exists():
return new_name
# ASGI Group extension
def group_add(self, group, channel):
"""
Adds the channel to the named group for at least 'expiry'
seconds (expiry defaults to message expiry if not provided).
"""
self.group_model.objects.update_or_create(
group=group,
channel=channel,
)
def group_discard(self, group, channel):
"""
Removes the channel from the named group if it is in the group;
does nothing otherwise (does not error)
"""
self.group_model.objects.filter(group=group, channel=channel).delete()
def send_group(self, group, message):
"""
Sends a message to the entire group.
"""
self._clean_expired()
for channel in self.group_model.objects.filter(group=group).values_list("channel", flat=True):
self.send(channel, message)
# ASGI Flush extension
def flush(self):
self.channel_model.objects.all().delete()
self.group_model.objects.all().delete()
# Serialization
def serialize(self, message):
return AsgiJsonEncoder().encode(message)
def deserialize(self, message):
return AsgiJsonDecoder().decode(message)
# Database state mgmt
@property
def connection(self):
"""
Returns the correct connection for the current thread.
"""
return connections[self.db_alias]
@cached_property
def channel_model(self):
"""
Initialises a new model to store messages; not done as part of a
models.py as we don't want to make it for most installs.
"""
# Make the model class
class Message(models.Model):
# We assume an autoincrementing PK for message order
channel = models.CharField(max_length=200, db_index=True)
content = models.TextField()
expiry = models.DateTimeField(db_index=True)
class Meta:
apps = Apps()
app_label = "channels"
db_table = "django_channels_channel"
# Ensure its table exists
if Message._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()):
with self.connection.schema_editor() as editor:
editor.create_model(Message)
return Message
@cached_property
def group_model(self):
"""
Initialises a new model to store groups; not done as part of a
models.py as we don't want to make it for most installs.
"""
# Make the model class
class Group(models.Model):
group = models.CharField(max_length=200)
channel = models.CharField(max_length=200)
created = models.DateTimeField(db_index=True, auto_now_add=True)
class Meta:
apps = Apps()
app_label = "channels"
db_table = "django_channels_group"
unique_together = [["group", "channel"]]
# Ensure its table exists with the right schema
if Group._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()):
with self.connection.schema_editor() as editor:
editor.create_model(Group)
return Group
def _clean_expired(self):
"""
Cleans out expired groups and messages.
"""
# Include a 1-second grace period for clock sync drift
target = now() - datetime.timedelta(seconds=1)
# First, go through old messages and pick out channels that got expired
old_messages = self.channel_model.objects.filter(expiry__lt=target)
channels_to_ungroup = old_messages.values_list("channel", flat=True).distinct()
old_messages.delete()
# Now, remove channel membership from channels that expired and ones that just expired
self.group_model.objects.filter(
models.Q(channel__in=channels_to_ungroup) |
models.Q(created__lte=target - datetime.timedelta(seconds=self.group_expiry))
).delete()
def __str__(self):
return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias)
class AsgiJsonEncoder(json.JSONEncoder):
"""
Special encoder that transforms bytestrings into unicode strings
prefixed with u+ffff
"""
def transform(self, o):
if isinstance(o, (list, tuple)):
return [self.transform(x) for x in o]
elif isinstance(o, dict):
return {
self.transform(k): self.transform(v)
for k, v in o.items()
}
elif isinstance(o, six.binary_type):
return u"\uffff" + base64.b64encode(o).decode("ascii")
else:
return o
def encode(self, o):
return super(AsgiJsonEncoder, self).encode(self.transform(o))
class AsgiJsonDecoder(json.JSONDecoder):
"""
Special encoder that transforms bytestrings into unicode strings
prefixed with u+ffff
"""
def transform(self, o):
if isinstance(o, (list, tuple)):
return [self.transform(x) for x in o]
elif isinstance(o, dict):
return {
self.transform(k): self.transform(v)
for k, v in o.items()
}
elif isinstance(o, six.text_type) and o and o[0] == u"\uffff":
return base64.b64decode(o[1:].encode("ascii"))
else:
return o
def decode(self, o):
return self.transform(super(AsgiJsonDecoder, self).decode(o))

View File

@ -1,8 +0,0 @@
from __future__ import unicode_literals
from asgiref.conformance import ConformanceTestCase
from channels.database_layer import DatabaseChannelLayer
class DatabaseLayerTests(ConformanceTestCase):
channel_layer = DatabaseChannelLayer(expiry=1, group_expiry=3)
expiry_delay = 2.1

View File

@ -55,24 +55,23 @@ settings. Any misconfigured interface server or worker will drop some or all
messages.
Database
--------
IPC
---
The database layer is intended as a short-term solution for people who can't
use a more production-ready layer (for example, Redis), but still want something
that will work cross-process. It has poor performance, and is only
recommended for development or extremely small deployments.
The IPC backend uses POSIX shared memory segments and semaphores in order to
allow different processes on the same machine to communicate with each other.
This layer is included with Channels; just set your ``BACKEND`` to
``channels.database_layer.DatabaseChannelLayer``, and it will use the
default Django database alias to store messages. You can change the alias
by setting ``CONFIG`` to ``{'alias': 'aliasname'}``.
As it uses shared memory, it does not require any additional servers running
to get working, and is quicker than any network-based channel layer. However,
it can only run between processes on the same machine.
.. warning::
The database channel layer is NOT fast, and performs especially poorly at
latency and throughput. We recommend its use only as a last resort, and only
on a database with good transaction support (e.g. Postgres), or you may
get errors with multiple message delivery.
The IPC layer only communicates between processes on the same machine,
and while you might initially be tempted to run a cluster of machines all
with their own IPC-based set of processes, this will result in groups not
working properly; events sent to a group will only go to those channels
that joined the group on the same machine. This backend is for
single-machine deployments only.
In-memory

View File

@ -91,7 +91,8 @@ single process tied to a WSGI server, Django runs in three separate layers:
cover this later.
* The channel backend, which is a combination of pluggable Python code and
a datastore (a database, or Redis) responsible for transporting messages.
a datastore (e.g. Redis, or a shared memory segment) responsible for
transporting messages.
* The workers, that listen on all relevant channels and run consumer code
when a message is ready.

View File

@ -1,8 +1,9 @@
Deploying
=========
Deploying applications using Channels requires a few more steps than a normal
Django WSGI application, but it's not very many.
Deploying applications using channels requires a few more steps than a normal
Django WSGI application, but you have a couple of options as to how to deploy
it and how much of your traffic you wish to route through the channel layers.
Firstly, remember that it's an entirely optional part of Django.
If you leave a project with the default settings (no ``CHANNEL_LAYERS``),
@ -14,15 +15,27 @@ When you want to enable channels in production, you need to do three things:
* Run worker servers
* Run interface servers
You can set things up in one of two ways; either route all traffic through
a :ref:`HTTP/WebSocket interface server <asgi-alone>`, removing the need
to run a WSGI server at all; or, just route WebSockets and long-poll
HTTP connections to the interface server, and :ref:`leave other pages served
by a standard WSGI server <wsgi-with-asgi>`.
Routing all traffic through the interface server lets you have WebSockets and
long-polling coexist in the same URL tree with no configuration; if you split
the traffic up, you'll need to configure a webserver or layer 7 loadbalancer
in front of the two servers to route requests to the correct place based on
path or domain. Both methods are covered below.
Setting up a channel backend
----------------------------
The first step is to set up a channel backend. If you followed the
:doc:`getting-started` guide, you will have ended up using the database
backend, which is great for getting started quickly in development but totally
unsuitable for production use; it will hammer your database with lots of
queries as it polls for new messages.
:doc:`getting-started` guide, you will have ended up using the in-memory
backend, which is useful for ``runserver``, but as it only works inside the
same process, useless for actually running separate worker and interface
servers.
Instead, take a look at the list of :doc:`backends`, and choose one that
fits your requirements (additionally, you could use a third-party pluggable
@ -48,9 +61,22 @@ To use the Redis backend you have to install it::
pip install -U asgi_redis
Some backends, though, don't require an extra server, like the IPC backend,
which works between processes on the same machine but not over the network
(it's available in the ``asgi_ipc`` package)::
Make sure the same settings file is used across all your workers, interfaces
and WSGI apps; without it, they won't be able to talk to each other and things
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgi_ipc.IPCChannelLayer",
"ROUTING": "my_project.routing.channel_routing",
"CONFIG": {
"prefix": "mysite",
},
},
}
Make sure the same settings file is used across all your workers and interface
servers; without it, they won't be able to talk to each other and things
will just fail to work.
@ -61,7 +87,7 @@ Because the work of running consumers is decoupled from the work of talking
to HTTP, WebSocket and other client connections, you need to run a cluster
of "worker servers" to do all the processing.
Each server is single-threaded, so it's recommended you run around one per
Each server is single-threaded, so it's recommended you run around one or two per
core on each machine; it's safe to run as many concurrent workers on the same
machine as you like, as they don't open any ports (all they do is talk to
the channel backend).
@ -77,7 +103,7 @@ and forward them to stderr.
Make sure you keep an eye on how busy your workers are; if they get overloaded,
requests will take longer and longer to return as the messages queue up
(until the expiry limit is reached, at which point HTTP connections will
(until the expiry or capacity limit is reached, at which point HTTP connections will
start dropping).
In a more complex project, you won't want all your channels being served by the
@ -104,23 +130,23 @@ The final piece of the puzzle is the "interface servers", the processes that
do the work of taking incoming requests and loading them into the channels
system.
You can just keep running your Django code as a WSGI app if you like, behind
something like uwsgi or gunicorn; this won't let you support WebSockets, though.
Still, if you want to use a WSGI server and have it talk to a worker server
cluster on the backend, see :ref:`wsgi-to-asgi`.
If you want to support WebSockets, long-poll HTTP requests and other Channels
features, you'll need to run a native ASGI interface server, as the WSGI
specification has no support for running these kinds of requests concurrently.
We ship with an interface server that we recommend you use called
`Daphne <http://github.com/andrewgodwin/daphne/>`_; it supports WebSockets,
long-poll HTTP requests, HTTP/2 *(soon)* and performs quite well.
Of course, any ASGI-compliant server will work!
Notably, Daphne has a nice feature where it supports all of these protocols on
the same port and on all paths; it auto-negotiates between HTTP and WebSocket,
You can just keep running your Django code as a WSGI app if you like, behind
something like uwsgi or gunicorn; this won't let you support WebSockets, though,
so you'll need to run a separate interface server to terminate those connections
and configure routing in front of your interface and WSGI servers to route
requests appropriately.
If you use Daphne for all traffic, it auto-negotiates between HTTP and WebSocket,
so there's no need to have your WebSockets on a separate port or path (and
they'll be able to share cookies with your normal view code).
they'll be able to share cookies with your normal view code, which isn't
possible if you separate by domain rather than path).
To run Daphne, it just needs to be supplied with a channel backend, in much
the same way a WSGI server needs to be given an application.
@ -144,7 +170,7 @@ like supervisord to ensure it is re-run if it exits unexpectedly.
If you only run Daphne and no workers, all of your page requests will seem to
hang forever; that's because Daphne doesn't have any worker servers to handle
the request and it's waiting for one to appear (while ``runserver`` also uses
Daphne, it launches a worker thread along with it in the same process). In this
Daphne, it launches worker threads along with it in the same process). In this
scenario, it will eventually time out and give you a 503 error after 2 minutes;
you can configure how long it waits with the ``--http-timeout`` command line
argument.
@ -164,42 +190,92 @@ workers. As long as the new code is session-compatible, you can even do staged
rollouts to make sure workers on new code aren't experiencing high error rates.
There's no need to restart the WSGI or WebSocket interface servers unless
you've upgraded the interface server itself or changed any Django settings;
none of your code is used by them, and all middleware and code that can
you've upgraded the interface server itself or changed the ``CHANNEL_LAYER``
setting; none of your code is used by them, and all middleware and code that can
customize requests is run on the consumers.
You can even use different Python versions for the interface servers and the
workers; the ASGI protocol that channel layers communicate over
is designed to be very portable and network-transparent.
is designed to be portable across all Python versions.
.. _wsgi-to-asgi:
.. _asgi-alone:
Running ASGI under WSGI
-----------------------
Running just ASGI
-----------------
ASGI is a relatively new specification, and so it's backwards compatible with
WSGI servers with an adapter layer. You won't get WebSocket support this way -
WSGI doesn't support WebSockets - but you can run a separate ASGI server to
handle WebSockets if you want.
If you are just running Daphne to serve all traffic, then the configuration
above is enough where you can just expose it to the Internet and it'll serve
whatever kind of request comes in; for a small site, just the one Daphne
instance and four or five workers is likely enough.
The ``asgiref`` package contains the adapter; all you need to do is put this
in your Django project's ``wsgi.py`` to declare a new WSGI application object
that backs onto ASGI underneath::
However, larger sites will need to deploy things at a slightly larger scale,
and how you scale things up is different from WSGI; see :ref:`scaling-up`.
import os
from asgiref.wsgi import WsgiToAsgiAdapter
from channels.asgi import get_channel_layer
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_test.settings")
channel_layer = get_channel_layer()
application = WsgiToAsgiAdapter(channel_layer)
.. _wsgi-with-asgi:
While this removes WebSocket support through the same port that HTTP is served
on, it still lets you use other channels features such as background tasks or
alternative interface servers (that would let you write consumers against
incoming emails or IRC messages).
Running ASGI alongside WSGI
---------------------------
You can also use this method to serve HTTP through your existing stack
and run Daphne on a separate port or domain to receive only WebSocket
connections.
ASGI and its canonical interface server Daphne are both relatively new,
and so you may not wish to run all your traffic through it yet (or you may
be using specialized features of your existing WSGI server).
If that's the case, that's fine; you can run Daphne and a WSGI server alongside
each other, and only have Daphne serve the requests you need it to (usually
WebSocket and long-poll HTTP requests, as these do not fit into the WSGI model).
To do this, just set up your Daphne to serve as we discussed above, and then
configure your load-balancer or front HTTP server process to dispatch requests
to the correct server - based on either path, domain, or if
you can, the Upgrade header.
Dispatching based on path or domain means you'll need to design your WebSocket
URLs carefully so you can always tell how to route them at the load-balancer
level; the ideal thing is to be able to look for the ``Upgrade: WebSocket``
header and distinguish connections by this, but not all software supports this
and it doesn't help route long-poll HTTP connections at all.
You could also invert this model, and have all connections go to Daphne by
default and selectively route some back to the WSGI server, if you have
particular URLs or domains you want to use that server on.
.. _scaling-up:
Scaling Up
----------
Scaling up a deployment containing channels (and thus running ASGI) is a little
different to scaling a WSGI deployment.
The fundamental difference is that the group mechanic requires all servers serving
the same site to be able to see each other; if you separate the site up and run
it in a few, large clusters, messages to groups will only deliver to WebSockets
connected to the same cluster. For some site designs this will be fine, and if
you think you can live with this and design around it (which means never
designing anything around global notifications or events), this may be a good
way to go.
For most projects, you'll need to run a single channel layer at scale in order
to achieve proper group delivery. Different backends will scale up differently,
but the Redis backend can use multiple Redis servers and spread the load
across them using sharding based on consistent hashing.
The key to a channel layer knowing how to scale a channel's delivery is if it
contains the ``!`` character or not, which signifies a single-reader channel.
Single-reader channels are only ever connected to by a single process, and so
in the Redis case are stored on a single, predictable shard. Other channels
are assumed to have many workers trying to read them, and so messages for
these can be evenly divided across all shards.
Django channels are still relatively new, and so it's likely that we don't yet
know the full story about how to scale things up; we run large load tests to
try and refine and improve large-project scaling, but it's no substitute for
actual traffic. If you're running channels at scale, you're encouraged to
send feedback to the Django team and work with us to hone the design and
performance of the channel layer backends, or you're free to make your own;
the ASGI specification is comprehensive and comes with a conformance test
suite, which should aid in any modification of existing backends or development
of new ones.

View File

@ -29,7 +29,6 @@ Contents:
installation
getting-started
deploying
scaling
backends
testing
cross-compat

View File

@ -1,37 +0,0 @@
Scaling
=======
Of course, one of the downsides of introducing a channel layer to Django is
that it's something else that must scale. Scaling traditional Django as a
WSGI application is easy - you just add more servers and a loadbalancer. Your
database is likely to be the thing that stopped scaling before, and there's
a relatively large amount of knowledge about tackling that problem.
By comparison, there's not as much knowledge about scaling something like this
(though as it is very similar to a task queue, we have some work to build from).
In particular, the fact that messages are at-most-once - we do not guarantee
delivery, in the same way a webserver doesn't guarantee a response - means
we can loosen a lot of restrictions that slow down more traditional task queues.
In addition, because channels can only have single consumers and they're handled
by a fleet of workers all running the same code, we could easily split out
incoming work by sharding into separate clusters of channel backends
and worker servers - any cluster can handle any request, so we can just
loadbalance over them.
Of course, that doesn't work for interface servers, where only a single
particular server is listening to each response channel - if we broke things
into clusters, it might end up that a response is sent on a different cluster
to the one that the interface server is listening on.
That's why Channels labels any *response channel* with a leading ``!``, letting
you know that only one server is listening for it, and thus letting you scale
and shard the two different types of channels accordingly (for more on
the difference, see :ref:`channel-types`).
This is the underlying theory behind Channels' sharding model - normal channels
are sent to random Redis servers, while response channels are sent to a
predictable server that both the interface server and worker can derive.
Currently, sharding is implemented as part of the Redis backend only;
see the :doc:`backend documentation <backends>` for more information.

View File

@ -170,9 +170,6 @@ class Patchinator(object):
FileMap(
"channels/channel.py", "django/channels/channel.py", python_transforms,
),
FileMap(
"channels/database_layer.py", "django/channels/database_layer.py", python_transforms,
),
FileMap(
"channels/exceptions.py", "django/channels/exceptions.py", python_transforms,
),
@ -209,9 +206,6 @@ class Patchinator(object):
NewFile(
"tests/channels_tests/__init__.py",
),
FileMap(
"channels/tests/test_database_layer.py", "tests/channels_tests/test_database_layer.py", python_transforms,
),
FileMap(
"channels/tests/test_handler.py", "tests/channels_tests/test_handler.py", python_transforms,
),
@ -240,9 +234,6 @@ class Patchinator(object):
FileMap(
"docs/reference.rst", "docs/ref/channels/api.txt", docs_transforms,
),
FileMap(
"docs/scaling.rst", "docs/topics/channels/scaling.txt", docs_transforms,
),
FileMap(
"docs/testing.rst", "docs/topics/channels/testing.txt", docs_transforms,
),