From dcbab8b2b426ee85ba420b1a71d44b135706b875 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sat, 7 May 2016 10:35:12 -0700 Subject: [PATCH] Remove DatabaseLayer and improve deployment docs mentioning it --- channels/database_layer.py | 253 -------------------------- channels/tests/test_database_layer.py | 8 - docs/backends.rst | 27 ++- docs/concepts.rst | 3 +- docs/deploying.rst | 168 ++++++++++++----- docs/index.rst | 1 - docs/scaling.rst | 37 ---- patchinator.py | 9 - 8 files changed, 137 insertions(+), 369 deletions(-) delete mode 100644 channels/database_layer.py delete mode 100644 channels/tests/test_database_layer.py delete mode 100755 docs/scaling.rst diff --git a/channels/database_layer.py b/channels/database_layer.py deleted file mode 100644 index 88be4a3..0000000 --- a/channels/database_layer.py +++ /dev/null @@ -1,253 +0,0 @@ -import base64 -import datetime -import json -import random -import string -import time - -from django.apps.registry import Apps -from django.db import DEFAULT_DB_ALIAS, connections, models, transaction -from django.db.utils import OperationalError -from django.utils import six -from django.utils.functional import cached_property -from django.utils.timezone import now - - -class DatabaseChannelLayer(object): - """ - ORM-backed ASGI channel layer. - - For development use only; it will span multiple processes fine, - but it's going to be pretty bad at throughput. If you're reading this and - running it in production, PLEASE STOP. - - Also uses JSON for serialization, as we don't want to make Django depend - on msgpack for the built-in backend. The JSON format uses \uffff as first - character to signify a b64 byte string rather than a text string. Ugly, but - it's not a valid Unicode character, so it should be safe enough. - """ - - def __init__(self, db_alias=DEFAULT_DB_ALIAS, expiry=60, group_expiry=86400): - self.expiry = expiry - self.group_expiry = group_expiry - self.db_alias = db_alias - - # ASGI API - - extensions = ["groups", "flush"] - - class MessageTooLarge(Exception): - pass - - class ChannelFull(Exception): - pass - - def send(self, channel, message): - # Typecheck - assert isinstance(message, dict), "message is not a dict" - assert isinstance(channel, six.text_type), "%s is not unicode" % channel - # Write message to messages table - self.channel_model.objects.create( - channel=channel, - content=self.serialize(message), - expiry=now() + datetime.timedelta(seconds=self.expiry) - ) - - def receive_many(self, channels, block=False): - if not channels: - return None, None - assert all(isinstance(channel, six.text_type) for channel in channels) - # Shuffle channels - channels = list(channels) - random.shuffle(channels) - # Clean out expired messages - self._clean_expired() - # Get a message from one of our channels - while True: - try: - with transaction.atomic(): - message = self.channel_model.objects.select_for_update().filter( - channel__in=channels - ).order_by("id").first() - if message: - self.channel_model.objects.filter(pk=message.pk).delete() - return message.channel, self.deserialize(message.content) - except OperationalError: - # The database is probably trying to prevent a deadlock - time.sleep(0.1) - continue - if block: - time.sleep(1) - else: - return None, None - - def new_channel(self, pattern): - assert isinstance(pattern, six.text_type) - assert pattern.endswith("!") - # Keep making channel names till one isn't present. - while True: - random_string = "".join(random.choice(string.ascii_letters) for i in range(10)) - new_name = pattern + random_string - if not self.channel_model.objects.filter(channel=new_name).exists(): - return new_name - - # ASGI Group extension - - def group_add(self, group, channel): - """ - Adds the channel to the named group for at least 'expiry' - seconds (expiry defaults to message expiry if not provided). - """ - self.group_model.objects.update_or_create( - group=group, - channel=channel, - ) - - def group_discard(self, group, channel): - """ - Removes the channel from the named group if it is in the group; - does nothing otherwise (does not error) - """ - self.group_model.objects.filter(group=group, channel=channel).delete() - - def send_group(self, group, message): - """ - Sends a message to the entire group. - """ - self._clean_expired() - for channel in self.group_model.objects.filter(group=group).values_list("channel", flat=True): - self.send(channel, message) - - # ASGI Flush extension - - def flush(self): - self.channel_model.objects.all().delete() - self.group_model.objects.all().delete() - - # Serialization - - def serialize(self, message): - return AsgiJsonEncoder().encode(message) - - def deserialize(self, message): - return AsgiJsonDecoder().decode(message) - - # Database state mgmt - - @property - def connection(self): - """ - Returns the correct connection for the current thread. - """ - return connections[self.db_alias] - - @cached_property - def channel_model(self): - """ - Initialises a new model to store messages; not done as part of a - models.py as we don't want to make it for most installs. - """ - # Make the model class - class Message(models.Model): - # We assume an autoincrementing PK for message order - channel = models.CharField(max_length=200, db_index=True) - content = models.TextField() - expiry = models.DateTimeField(db_index=True) - - class Meta: - apps = Apps() - app_label = "channels" - db_table = "django_channels_channel" - # Ensure its table exists - if Message._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()): - with self.connection.schema_editor() as editor: - editor.create_model(Message) - return Message - - @cached_property - def group_model(self): - """ - Initialises a new model to store groups; not done as part of a - models.py as we don't want to make it for most installs. - """ - # Make the model class - class Group(models.Model): - group = models.CharField(max_length=200) - channel = models.CharField(max_length=200) - created = models.DateTimeField(db_index=True, auto_now_add=True) - - class Meta: - apps = Apps() - app_label = "channels" - db_table = "django_channels_group" - unique_together = [["group", "channel"]] - # Ensure its table exists with the right schema - if Group._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()): - with self.connection.schema_editor() as editor: - editor.create_model(Group) - return Group - - def _clean_expired(self): - """ - Cleans out expired groups and messages. - """ - # Include a 1-second grace period for clock sync drift - target = now() - datetime.timedelta(seconds=1) - # First, go through old messages and pick out channels that got expired - old_messages = self.channel_model.objects.filter(expiry__lt=target) - channels_to_ungroup = old_messages.values_list("channel", flat=True).distinct() - old_messages.delete() - # Now, remove channel membership from channels that expired and ones that just expired - self.group_model.objects.filter( - models.Q(channel__in=channels_to_ungroup) | - models.Q(created__lte=target - datetime.timedelta(seconds=self.group_expiry)) - ).delete() - - def __str__(self): - return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias) - - -class AsgiJsonEncoder(json.JSONEncoder): - """ - Special encoder that transforms bytestrings into unicode strings - prefixed with u+ffff - """ - - def transform(self, o): - if isinstance(o, (list, tuple)): - return [self.transform(x) for x in o] - elif isinstance(o, dict): - return { - self.transform(k): self.transform(v) - for k, v in o.items() - } - elif isinstance(o, six.binary_type): - return u"\uffff" + base64.b64encode(o).decode("ascii") - else: - return o - - def encode(self, o): - return super(AsgiJsonEncoder, self).encode(self.transform(o)) - - -class AsgiJsonDecoder(json.JSONDecoder): - """ - Special encoder that transforms bytestrings into unicode strings - prefixed with u+ffff - """ - - def transform(self, o): - if isinstance(o, (list, tuple)): - return [self.transform(x) for x in o] - elif isinstance(o, dict): - return { - self.transform(k): self.transform(v) - for k, v in o.items() - } - elif isinstance(o, six.text_type) and o and o[0] == u"\uffff": - return base64.b64decode(o[1:].encode("ascii")) - else: - return o - - def decode(self, o): - return self.transform(super(AsgiJsonDecoder, self).decode(o)) diff --git a/channels/tests/test_database_layer.py b/channels/tests/test_database_layer.py deleted file mode 100644 index b9a11c0..0000000 --- a/channels/tests/test_database_layer.py +++ /dev/null @@ -1,8 +0,0 @@ -from __future__ import unicode_literals -from asgiref.conformance import ConformanceTestCase -from channels.database_layer import DatabaseChannelLayer - - -class DatabaseLayerTests(ConformanceTestCase): - channel_layer = DatabaseChannelLayer(expiry=1, group_expiry=3) - expiry_delay = 2.1 diff --git a/docs/backends.rst b/docs/backends.rst index 3935058..724d2b2 100644 --- a/docs/backends.rst +++ b/docs/backends.rst @@ -55,24 +55,23 @@ settings. Any misconfigured interface server or worker will drop some or all messages. -Database --------- +IPC +--- -The database layer is intended as a short-term solution for people who can't -use a more production-ready layer (for example, Redis), but still want something -that will work cross-process. It has poor performance, and is only -recommended for development or extremely small deployments. +The IPC backend uses POSIX shared memory segments and semaphores in order to +allow different processes on the same machine to communicate with each other. -This layer is included with Channels; just set your ``BACKEND`` to -``channels.database_layer.DatabaseChannelLayer``, and it will use the -default Django database alias to store messages. You can change the alias -by setting ``CONFIG`` to ``{'alias': 'aliasname'}``. +As it uses shared memory, it does not require any additional servers running +to get working, and is quicker than any network-based channel layer. However, +it can only run between processes on the same machine. .. warning:: - The database channel layer is NOT fast, and performs especially poorly at - latency and throughput. We recommend its use only as a last resort, and only - on a database with good transaction support (e.g. Postgres), or you may - get errors with multiple message delivery. + The IPC layer only communicates between processes on the same machine, + and while you might initially be tempted to run a cluster of machines all + with their own IPC-based set of processes, this will result in groups not + working properly; events sent to a group will only go to those channels + that joined the group on the same machine. This backend is for + single-machine deployments only. In-memory diff --git a/docs/concepts.rst b/docs/concepts.rst index 64d9cad..9830b35 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -91,7 +91,8 @@ single process tied to a WSGI server, Django runs in three separate layers: cover this later. * The channel backend, which is a combination of pluggable Python code and - a datastore (a database, or Redis) responsible for transporting messages. + a datastore (e.g. Redis, or a shared memory segment) responsible for + transporting messages. * The workers, that listen on all relevant channels and run consumer code when a message is ready. diff --git a/docs/deploying.rst b/docs/deploying.rst index 13d31d7..c3947dd 100644 --- a/docs/deploying.rst +++ b/docs/deploying.rst @@ -1,8 +1,9 @@ Deploying ========= -Deploying applications using Channels requires a few more steps than a normal -Django WSGI application, but it's not very many. +Deploying applications using channels requires a few more steps than a normal +Django WSGI application, but you have a couple of options as to how to deploy +it and how much of your traffic you wish to route through the channel layers. Firstly, remember that it's an entirely optional part of Django. If you leave a project with the default settings (no ``CHANNEL_LAYERS``), @@ -14,15 +15,27 @@ When you want to enable channels in production, you need to do three things: * Run worker servers * Run interface servers +You can set things up in one of two ways; either route all traffic through +a :ref:`HTTP/WebSocket interface server `, removing the need +to run a WSGI server at all; or, just route WebSockets and long-poll +HTTP connections to the interface server, and :ref:`leave other pages served +by a standard WSGI server `. + +Routing all traffic through the interface server lets you have WebSockets and +long-polling coexist in the same URL tree with no configuration; if you split +the traffic up, you'll need to configure a webserver or layer 7 loadbalancer +in front of the two servers to route requests to the correct place based on +path or domain. Both methods are covered below. + Setting up a channel backend ---------------------------- The first step is to set up a channel backend. If you followed the -:doc:`getting-started` guide, you will have ended up using the database -backend, which is great for getting started quickly in development but totally -unsuitable for production use; it will hammer your database with lots of -queries as it polls for new messages. +:doc:`getting-started` guide, you will have ended up using the in-memory +backend, which is useful for ``runserver``, but as it only works inside the +same process, useless for actually running separate worker and interface +servers. Instead, take a look at the list of :doc:`backends`, and choose one that fits your requirements (additionally, you could use a third-party pluggable @@ -48,9 +61,22 @@ To use the Redis backend you have to install it:: pip install -U asgi_redis +Some backends, though, don't require an extra server, like the IPC backend, +which works between processes on the same machine but not over the network +(it's available in the ``asgi_ipc`` package):: -Make sure the same settings file is used across all your workers, interfaces -and WSGI apps; without it, they won't be able to talk to each other and things + CHANNEL_LAYERS = { + "default": { + "BACKEND": "asgi_ipc.IPCChannelLayer", + "ROUTING": "my_project.routing.channel_routing", + "CONFIG": { + "prefix": "mysite", + }, + }, + } + +Make sure the same settings file is used across all your workers and interface +servers; without it, they won't be able to talk to each other and things will just fail to work. @@ -61,7 +87,7 @@ Because the work of running consumers is decoupled from the work of talking to HTTP, WebSocket and other client connections, you need to run a cluster of "worker servers" to do all the processing. -Each server is single-threaded, so it's recommended you run around one per +Each server is single-threaded, so it's recommended you run around one or two per core on each machine; it's safe to run as many concurrent workers on the same machine as you like, as they don't open any ports (all they do is talk to the channel backend). @@ -77,7 +103,7 @@ and forward them to stderr. Make sure you keep an eye on how busy your workers are; if they get overloaded, requests will take longer and longer to return as the messages queue up -(until the expiry limit is reached, at which point HTTP connections will +(until the expiry or capacity limit is reached, at which point HTTP connections will start dropping). In a more complex project, you won't want all your channels being served by the @@ -104,23 +130,23 @@ The final piece of the puzzle is the "interface servers", the processes that do the work of taking incoming requests and loading them into the channels system. -You can just keep running your Django code as a WSGI app if you like, behind -something like uwsgi or gunicorn; this won't let you support WebSockets, though. -Still, if you want to use a WSGI server and have it talk to a worker server -cluster on the backend, see :ref:`wsgi-to-asgi`. - If you want to support WebSockets, long-poll HTTP requests and other Channels features, you'll need to run a native ASGI interface server, as the WSGI specification has no support for running these kinds of requests concurrently. We ship with an interface server that we recommend you use called `Daphne `_; it supports WebSockets, long-poll HTTP requests, HTTP/2 *(soon)* and performs quite well. -Of course, any ASGI-compliant server will work! -Notably, Daphne has a nice feature where it supports all of these protocols on -the same port and on all paths; it auto-negotiates between HTTP and WebSocket, +You can just keep running your Django code as a WSGI app if you like, behind +something like uwsgi or gunicorn; this won't let you support WebSockets, though, +so you'll need to run a separate interface server to terminate those connections +and configure routing in front of your interface and WSGI servers to route +requests appropriately. + +If you use Daphne for all traffic, it auto-negotiates between HTTP and WebSocket, so there's no need to have your WebSockets on a separate port or path (and -they'll be able to share cookies with your normal view code). +they'll be able to share cookies with your normal view code, which isn't +possible if you separate by domain rather than path). To run Daphne, it just needs to be supplied with a channel backend, in much the same way a WSGI server needs to be given an application. @@ -144,7 +170,7 @@ like supervisord to ensure it is re-run if it exits unexpectedly. If you only run Daphne and no workers, all of your page requests will seem to hang forever; that's because Daphne doesn't have any worker servers to handle the request and it's waiting for one to appear (while ``runserver`` also uses -Daphne, it launches a worker thread along with it in the same process). In this +Daphne, it launches worker threads along with it in the same process). In this scenario, it will eventually time out and give you a 503 error after 2 minutes; you can configure how long it waits with the ``--http-timeout`` command line argument. @@ -164,42 +190,92 @@ workers. As long as the new code is session-compatible, you can even do staged rollouts to make sure workers on new code aren't experiencing high error rates. There's no need to restart the WSGI or WebSocket interface servers unless -you've upgraded the interface server itself or changed any Django settings; -none of your code is used by them, and all middleware and code that can +you've upgraded the interface server itself or changed the ``CHANNEL_LAYER`` +setting; none of your code is used by them, and all middleware and code that can customize requests is run on the consumers. You can even use different Python versions for the interface servers and the workers; the ASGI protocol that channel layers communicate over -is designed to be very portable and network-transparent. +is designed to be portable across all Python versions. -.. _wsgi-to-asgi: +.. _asgi-alone: -Running ASGI under WSGI ------------------------ +Running just ASGI +----------------- -ASGI is a relatively new specification, and so it's backwards compatible with -WSGI servers with an adapter layer. You won't get WebSocket support this way - -WSGI doesn't support WebSockets - but you can run a separate ASGI server to -handle WebSockets if you want. +If you are just running Daphne to serve all traffic, then the configuration +above is enough where you can just expose it to the Internet and it'll serve +whatever kind of request comes in; for a small site, just the one Daphne +instance and four or five workers is likely enough. -The ``asgiref`` package contains the adapter; all you need to do is put this -in your Django project's ``wsgi.py`` to declare a new WSGI application object -that backs onto ASGI underneath:: +However, larger sites will need to deploy things at a slightly larger scale, +and how you scale things up is different from WSGI; see :ref:`scaling-up`. - import os - from asgiref.wsgi import WsgiToAsgiAdapter - from channels.asgi import get_channel_layer - os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_test.settings") - channel_layer = get_channel_layer() - application = WsgiToAsgiAdapter(channel_layer) +.. _wsgi-with-asgi: -While this removes WebSocket support through the same port that HTTP is served -on, it still lets you use other channels features such as background tasks or -alternative interface servers (that would let you write consumers against -incoming emails or IRC messages). +Running ASGI alongside WSGI +--------------------------- -You can also use this method to serve HTTP through your existing stack -and run Daphne on a separate port or domain to receive only WebSocket -connections. +ASGI and its canonical interface server Daphne are both relatively new, +and so you may not wish to run all your traffic through it yet (or you may +be using specialized features of your existing WSGI server). + +If that's the case, that's fine; you can run Daphne and a WSGI server alongside +each other, and only have Daphne serve the requests you need it to (usually +WebSocket and long-poll HTTP requests, as these do not fit into the WSGI model). + +To do this, just set up your Daphne to serve as we discussed above, and then +configure your load-balancer or front HTTP server process to dispatch requests +to the correct server - based on either path, domain, or if +you can, the Upgrade header. + +Dispatching based on path or domain means you'll need to design your WebSocket +URLs carefully so you can always tell how to route them at the load-balancer +level; the ideal thing is to be able to look for the ``Upgrade: WebSocket`` +header and distinguish connections by this, but not all software supports this +and it doesn't help route long-poll HTTP connections at all. + +You could also invert this model, and have all connections go to Daphne by +default and selectively route some back to the WSGI server, if you have +particular URLs or domains you want to use that server on. + + +.. _scaling-up: + +Scaling Up +---------- + +Scaling up a deployment containing channels (and thus running ASGI) is a little +different to scaling a WSGI deployment. + +The fundamental difference is that the group mechanic requires all servers serving +the same site to be able to see each other; if you separate the site up and run +it in a few, large clusters, messages to groups will only deliver to WebSockets +connected to the same cluster. For some site designs this will be fine, and if +you think you can live with this and design around it (which means never +designing anything around global notifications or events), this may be a good +way to go. + +For most projects, you'll need to run a single channel layer at scale in order +to achieve proper group delivery. Different backends will scale up differently, +but the Redis backend can use multiple Redis servers and spread the load +across them using sharding based on consistent hashing. + +The key to a channel layer knowing how to scale a channel's delivery is if it +contains the ``!`` character or not, which signifies a single-reader channel. +Single-reader channels are only ever connected to by a single process, and so +in the Redis case are stored on a single, predictable shard. Other channels +are assumed to have many workers trying to read them, and so messages for +these can be evenly divided across all shards. + +Django channels are still relatively new, and so it's likely that we don't yet +know the full story about how to scale things up; we run large load tests to +try and refine and improve large-project scaling, but it's no substitute for +actual traffic. If you're running channels at scale, you're encouraged to +send feedback to the Django team and work with us to hone the design and +performance of the channel layer backends, or you're free to make your own; +the ASGI specification is comprehensive and comes with a conformance test +suite, which should aid in any modification of existing backends or development +of new ones. diff --git a/docs/index.rst b/docs/index.rst index 4ad112e..92e06e4 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -29,7 +29,6 @@ Contents: installation getting-started deploying - scaling backends testing cross-compat diff --git a/docs/scaling.rst b/docs/scaling.rst deleted file mode 100755 index 71cfd98..0000000 --- a/docs/scaling.rst +++ /dev/null @@ -1,37 +0,0 @@ -Scaling -======= - -Of course, one of the downsides of introducing a channel layer to Django is -that it's something else that must scale. Scaling traditional Django as a -WSGI application is easy - you just add more servers and a loadbalancer. Your -database is likely to be the thing that stopped scaling before, and there's -a relatively large amount of knowledge about tackling that problem. - -By comparison, there's not as much knowledge about scaling something like this -(though as it is very similar to a task queue, we have some work to build from). -In particular, the fact that messages are at-most-once - we do not guarantee -delivery, in the same way a webserver doesn't guarantee a response - means -we can loosen a lot of restrictions that slow down more traditional task queues. - -In addition, because channels can only have single consumers and they're handled -by a fleet of workers all running the same code, we could easily split out -incoming work by sharding into separate clusters of channel backends -and worker servers - any cluster can handle any request, so we can just -loadbalance over them. - -Of course, that doesn't work for interface servers, where only a single -particular server is listening to each response channel - if we broke things -into clusters, it might end up that a response is sent on a different cluster -to the one that the interface server is listening on. - -That's why Channels labels any *response channel* with a leading ``!``, letting -you know that only one server is listening for it, and thus letting you scale -and shard the two different types of channels accordingly (for more on -the difference, see :ref:`channel-types`). - -This is the underlying theory behind Channels' sharding model - normal channels -are sent to random Redis servers, while response channels are sent to a -predictable server that both the interface server and worker can derive. - -Currently, sharding is implemented as part of the Redis backend only; -see the :doc:`backend documentation ` for more information. diff --git a/patchinator.py b/patchinator.py index 4cbf44b..f672924 100644 --- a/patchinator.py +++ b/patchinator.py @@ -170,9 +170,6 @@ class Patchinator(object): FileMap( "channels/channel.py", "django/channels/channel.py", python_transforms, ), - FileMap( - "channels/database_layer.py", "django/channels/database_layer.py", python_transforms, - ), FileMap( "channels/exceptions.py", "django/channels/exceptions.py", python_transforms, ), @@ -209,9 +206,6 @@ class Patchinator(object): NewFile( "tests/channels_tests/__init__.py", ), - FileMap( - "channels/tests/test_database_layer.py", "tests/channels_tests/test_database_layer.py", python_transforms, - ), FileMap( "channels/tests/test_handler.py", "tests/channels_tests/test_handler.py", python_transforms, ), @@ -240,9 +234,6 @@ class Patchinator(object): FileMap( "docs/reference.rst", "docs/ref/channels/api.txt", docs_transforms, ), - FileMap( - "docs/scaling.rst", "docs/topics/channels/scaling.txt", docs_transforms, - ), FileMap( "docs/testing.rst", "docs/topics/channels/testing.txt", docs_transforms, ),