diff --git a/channels/backends/redis_py.py b/channels/backends/redis_py.py index 02d683e..8c263a7 100644 --- a/channels/backends/redis_py.py +++ b/channels/backends/redis_py.py @@ -1,9 +1,14 @@ import time import json import datetime +import math import redis +import random +import binascii import uuid +from django.utils import six + from .base import BaseChannelBackend @@ -13,41 +18,81 @@ class RedisChannelBackend(BaseChannelBackend): multiple processes fine, but it's going to be pretty bad at throughput. """ - def __init__(self, routing, expiry=60, host="localhost", port=6379, prefix="django-channels:"): + def __init__(self, routing, expiry=60, hosts=None, prefix="django-channels:"): super(RedisChannelBackend, self).__init__(routing=routing, expiry=expiry) - self.host = host - self.port = port + # Make sure they provided some hosts, or provide a default + if not hosts: + hosts = [("localhost", 6379)] + for host, port in hosts: + assert isinstance(host, six.string_types) + assert int(port) + self.hosts = hosts self.prefix = prefix + # Precalculate some values for ring selection + self.ring_size = len(self.hosts) + self.ring_divisor = int(math.ceil(4096 / float(self.ring_size))) - @property - def connection(self): + def consistent_hash(self, value): + """ + Maps the value to a node value between 0 and 4095 + using MD5, then down to one of the ring nodes. + """ + bigval = binascii.crc32(value) & 0xffffffff + return (bigval // 0x100000) // self.ring_divisor + + def random_index(self): + return random.randint(0, len(self.hosts) - 1) + + def connection(self, index): """ Returns the correct connection for the current thread. + + Pass key to use a server based on consistent hashing of the key value; + pass None to use a random server instead. """ - return redis.Redis(host=self.host, port=self.port) + # If index is explicitly None, pick a random server + if index is None: + index = self.random_index() + # Catch bad indexes + if not (0 <= index < self.ring_size): + raise ValueError("There are only %s hosts - you asked for %s!" % (self.ring_size, index)) + host, port = self.hosts[index] + return redis.Redis(host=host, port=port) + + @property + def connections(self): + for i in range(len(self.hosts)): + return self.connection(i) def send(self, channel, message): # if channel is no str (=> bytes) convert it if not isinstance(channel, str): channel = channel.decode('utf-8') - + # Pick a connection to the right server - consistent for response + # channels, random for normal channels + if channel.startswith("!"): + index = self.consistent_hash(key) + connection = self.connection(index) + else: + connection = self.connection(None) # Write out message into expiring key (avoids big items in list) - key = self.prefix + str(uuid.uuid4()) - self.connection.set( + # TODO: Use extended set, drop support for older redis? + key = self.prefix + uuid.uuid4().get_hex() + connection.set( key, json.dumps(message), ) - self.connection.expire( + connection.expire( key, self.expiry + 10, ) # Add key to list - self.connection.rpush( + connection.rpush( self.prefix + channel, key, ) # Set list to expire when message does (any later messages will bump this) - self.connection.expire( + connection.expire( self.prefix + channel, self.expiry + 10, ) @@ -56,13 +101,27 @@ class RedisChannelBackend(BaseChannelBackend): def receive_many(self, channels): if not channels: raise ValueError("Cannot receive on empty channel list!") - # Shuffle channels to avoid the first ones starving others of workers - random.shuffle(channels) + # Work out what servers to listen on for the given channels + indexes = {} + random_index = self.random_index() + for channel in channels: + if channel.startswith("!"): + indexes.setdefault(self.consistent_hash(channel), []).append(channel) + else: + indexes.setdefault(random_index, []).append(channel) # Get a message from one of our channels while True: - result = self.connection.blpop([self.prefix + channel for channel in channels], timeout=1) + # Select a random connection to use + # TODO: Would we be better trying to do this truly async? + index = random.choice(indexes.keys()) + connection = self.connection(index) + channels = indexes[index] + # Shuffle channels to avoid the first ones starving others of workers + random.shuffle(channels) + # Pop off any waiting message + result = connection.blpop([self.prefix + channel for channel in channels], timeout=1) if result: - content = self.connection.get(result[1]) + content = connection.get(result[1]) if content is None: continue return result[0][len(self.prefix):].decode("utf-8"), json.loads(content.decode("utf-8")) @@ -75,7 +134,7 @@ class RedisChannelBackend(BaseChannelBackend): seconds (expiry defaults to message expiry if not provided). """ key = "%s:group:%s" % (self.prefix, group) - self.connection.zadd( + self.connection(self.consistent_hash(group)).zadd( key, **{channel: time.time() + (expiry or self.expiry)} ) @@ -86,7 +145,7 @@ class RedisChannelBackend(BaseChannelBackend): does nothing otherwise (does not error) """ key = "%s:group:%s" % (self.prefix, group) - self.connection.zrem( + self.connection(self.consistent_hash(group)).zrem( key, channel, ) @@ -96,10 +155,11 @@ class RedisChannelBackend(BaseChannelBackend): Returns an iterable of all channels in the group. """ key = "%s:group:%s" % (self.prefix, group) + connection = self.connection(self.consistent_hash(group)) # Discard old channels - self.connection.zremrangebyscore(key, 0, int(time.time()) - 10) + connection.zremrangebyscore(key, 0, int(time.time()) - 10) # Return current lot - return self.connection.zrange( + return connection.zrange( key, 0, -1, @@ -113,14 +173,14 @@ class RedisChannelBackend(BaseChannelBackend): obtained, False if lock not obtained. """ key = "%s:lock:%s" % (self.prefix, channel) - return bool(self.connection.setnx(key, "1")) + return bool(self.connection(self.consistent_hash(channel)).setnx(key, "1")) def unlock_channel(self, channel): """ Unlocks the named channel. Always succeeds. """ key = "%s:lock:%s" % (self.prefix, channel) - self.connection.delete(key) + self.connection(self.consistent_hash(channel)).delete(key) def __str__(self): return "%s(host=%s, port=%s)" % (self.__class__.__name__, self.host, self.port) diff --git a/docs/backends.rst b/docs/backends.rst index b80911a..bb92930 100644 --- a/docs/backends.rst +++ b/docs/backends.rst @@ -5,6 +5,50 @@ Multiple choices of backend are available, to fill different tradeoffs of complexity, throughput and scalability. You can also write your own backend if you wish; the API is very simple and documented below. +Redis +----- + +The Redis backend is the recommended backend to run Channels with, as it +supports both high throughput on a single Redis server as well as the ability +to run against a set of Redis servers in a sharded mode. + +To use the Redis backend you have to install the redis package:: + + pip install -U redis + +By default, it will attempt to connect to a Redis server on ``localhost:6379``, +but you can override this with the ``HOSTS`` setting:: + + CHANNEL_BACKENDS = { + "default": { + "BACKEND": "channels.backends.redis.RedisChannelBackend", + "HOSTS": [("redis-channel-1", 6379), ("redis-channel-2", 6379)], + }, + } + +Sharding +~~~~~~~~ + +The sharding model is based on consistent hashing - in particular, +:ref:`response channels ` are hashed and used to pick a single +Redis server that both the interface server and the worker will use. + +For normal channels, since any worker can service any channel request, messages +are simply distributed randomly among all possible servers, and workers will +pick a single server to listen to. Note that if you run more Redis servers than +workers, it's very likely that some servers will not have workers listening to +them; we recommend you always have at least ten workers for each Redis server +to ensure good distribution. Workers will, however, change server periodically +(every five seconds or so) so queued messages should eventually get a response. + +Note that if you change the set of sharding servers you will need to restart +all interface servers and workers with the new set before anything works, +and any in-flight messages will be lost (even with persistence, some will); +the consistent hashing model relies on all running clients having the same +settings. Any misconfigured interface server or worker will drop some or all +messages. + + In-memory --------- @@ -18,23 +62,7 @@ This backend provides no network transparency or non-blocking guarantees. Database -------- -Redis ------ - -To use the Redis backend you have to install the redis package:: - - pip install -U redis - -Also you need to set the following in the ``CHANNEL_BACKENDS`` setting:: - - CHANNEL_BACKENDS = { - "default": { - "BACKEND": "channels.backends.redis_py.RedisChannelBackend", - "HOST": "redis-hostname", - }, - } - - +======= Writing Custom Backends ----------------------- diff --git a/docs/deploying.rst b/docs/deploying.rst index 49433b7..f9fcd1a 100644 --- a/docs/deploying.rst +++ b/docs/deploying.rst @@ -36,8 +36,8 @@ here's an example for a remote Redis server:: CHANNEL_BACKENDS = { "default": { - "BACKEND": "channels.backends.redis_py.RedisChannelBackend", - "HOST": "redis-channel", + "BACKEND": "channels.backends.redis.RedisChannelBackend", + "HOSTS": [("redis-channel", 6379)], }, } diff --git a/docs/faqs.rst b/docs/faqs.rst old mode 100644 new mode 100755 diff --git a/docs/scaling.rst b/docs/scaling.rst old mode 100644 new mode 100755 index 8d9e87d..3bedf23 --- a/docs/scaling.rst +++ b/docs/scaling.rst @@ -28,3 +28,10 @@ That's why Channels labels any *response channel* with a leading ``!``, letting you know that only one server is listening for it, and thus letting you scale and shard the two different types of channels accordingly (for more on the difference, see :ref:`channel-types`). + +This is the underlying theory behind Channels' sharding model - normal channels +are sent to random Redis servers, while response channels are sent to a +predictable server that both the interface server and worker can derive. + +Currently, sharding is implemented as part of the Redis backend only; +see the :doc:`backend documentation ` for more information.