From 80627d8e372d90d466455094eec876fe90e63ebf Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 10 Jun 2015 11:17:32 -0700 Subject: [PATCH] Working database backend and "runworker" command --- channels/__init__.py | 2 +- channels/adapters.py | 4 +- channels/backends/__init__.py | 26 ++++++----- channels/backends/base.py | 7 +++ channels/backends/{orm.py => database.py} | 53 +++++++++++++---------- channels/backends/memory.py | 4 ++ channels/channel.py | 8 ++-- channels/management/commands/runserver.py | 15 ++++--- channels/management/commands/runworker.py | 41 ++++++++++++++++++ channels/worker.py | 13 +++--- 10 files changed, 121 insertions(+), 52 deletions(-) rename channels/backends/{orm.py => database.py} (54%) create mode 100644 channels/management/commands/runworker.py diff --git a/channels/__init__.py b/channels/__init__.py index 1010c41..c0b7065 100644 --- a/channels/__init__.py +++ b/channels/__init__.py @@ -1,4 +1,4 @@ -# Load backends +# Load backends, using settings if available (else falling back to a default) DEFAULT_CHANNEL_BACKEND = "default" from .backends import BackendManager from django.conf import settings diff --git a/channels/adapters.py b/channels/adapters.py index 0879241..cf365cd 100644 --- a/channels/adapters.py +++ b/channels/adapters.py @@ -47,7 +47,7 @@ def view_consumer(channel_name, alias=DEFAULT_CHANNEL_BACKEND): response = func(request) Channel(request.response_channel).send(**response.channel_encode()) # Get the channel layer and register - channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND] - channel_layer.registry.add_consumer(consumer, [channel_name]) + channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + channel_backend.registry.add_consumer(consumer, [channel_name]) return func return inner diff --git a/channels/backends/__init__.py b/channels/backends/__init__.py index 83c74f4..5582b73 100644 --- a/channels/backends/__init__.py +++ b/channels/backends/__init__.py @@ -11,17 +11,23 @@ class BackendManager(object): """ def __init__(self, backend_configs): + self.configs = backend_configs self.backends = {} - for name, config in backend_configs.items(): - # Load the backend class - try: - backend_class = import_string(config['BACKEND']) - except KeyError: - raise InvalidChannelBackendError("No BACKEND specified for %s" % name) - except ImportError: - raise InvalidChannelBackendError("Cannot import BACKEND %s specified for %s" % (config['BACKEND'], name)) - # Initialise and pass config - self.backends[name] = backend_class(**{k.lower(): v for k, v in config.items() if k != "BACKEND"}) + + def make_backend(self, name): + # Load the backend class + try: + backend_class = import_string(self.configs[name]['BACKEND']) + except KeyError: + raise InvalidChannelBackendError("No BACKEND specified for %s" % name) + except ImportError as e: + raise InvalidChannelBackendError("Cannot import BACKEND %r specified for %s" % (self.configs[name]['BACKEND'], name)) + # Initialise and pass config + instance = backend_class(**{k.lower(): v for k, v in self.configs[name].items() if k != "BACKEND"}) + instance.alias = name + return instance def __getitem__(self, key): + if key not in self.backends: + self.backends[key] = self.make_backend(key) return self.backends[key] diff --git a/channels/backends/base.py b/channels/backends/base.py index a3481cb..c2d8f7c 100644 --- a/channels/backends/base.py +++ b/channels/backends/base.py @@ -15,6 +15,10 @@ class BaseChannelBackend(object): registry of consumers. """ + # Flags if this backend can only be used inside one process. + # Causes errors if you try to run workers/interfaces separately with it. + local_only = False + def __init__(self, expiry=60): self.registry = ConsumerRegistry() self.expiry = expiry @@ -31,3 +35,6 @@ class BaseChannelBackend(object): channels passed, as a (channel, message) tuple. """ raise NotImplementedError() + + def __str__(self): + return self.__class__.__name__ diff --git a/channels/backends/orm.py b/channels/backends/database.py similarity index 54% rename from channels/backends/orm.py rename to channels/backends/database.py index fdf6645..eb48a19 100644 --- a/channels/backends/orm.py +++ b/channels/backends/database.py @@ -1,30 +1,40 @@ import time +import json import datetime from django.apps.registry import Apps from django.db import models, connections, DEFAULT_DB_ALIAS +from django.utils.functional import cached_property +from django.utils.timezone import now from .base import BaseChannelBackend queues = {} -class ORMChannelBackend(BaseChannelBackend): +class DatabaseChannelBackend(BaseChannelBackend): """ ORM-backed channel environment. For development use only; it will span multiple processes fine, but it's going to be pretty bad at throughput. """ - def __init__(self, expiry, db_alias=DEFAULT_DB_ALIAS): - super(ORMChannelBackend, self).__init__(expiry) - self.connection = connections[db_alias] - self.model = self.make_model() - self.ensure_schema() + def __init__(self, expiry=60, db_alias=DEFAULT_DB_ALIAS): + super(DatabaseChannelBackend, self).__init__(expiry) + self.db_alias = db_alias - def make_model(self): + @property + def connection(self): + """ + Returns the correct connection for the current thread. + """ + return connections[self.db_alias] + + @property + def model(self): """ Initialises a new model to store messages; not done as part of a models.py as we don't want to make it for most installs. """ + # Make the model class class Message(models.Model): # We assume an autoincrementing PK for message order channel = models.CharField(max_length=200, db_index=True) @@ -34,35 +44,32 @@ class ORMChannelBackend(BaseChannelBackend): apps = Apps() app_label = "channels" db_table = "django_channels" + # Ensure its table exists + if Message._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()): + with self.connection.schema_editor() as editor: + editor.create_model(Message) return Message - def ensure_schema(self): - """ - Ensures the table exists and has the correct schema. - """ - # If the table's there, that's fine - we've never changed its schema - # in the codebase. - if self.model._meta.db_table in self.connection.introspection.table_names(self.connection.cursor()): - return - # Make the table - with self.connection.schema_editor() as editor: - editor.create_model(self.model) - def send(self, channel, message): self.model.objects.create( channel = channel, - message = json.dumps(message), - expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.expiry) + content = json.dumps(message), + expiry = now() + datetime.timedelta(seconds=self.expiry) ) def receive_many(self, channels): + if not channels: + raise ValueError("Cannot receive on empty channel list!") while True: # Delete all expired messages (add 10 second grace period for clock sync) - self.model.objects.filter(expiry__lt=datetime.datetime.utcnow() - datetime.timedelta(seconds=10)).delete() + self.model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() # Get a message from one of our channels message = self.model.objects.filter(channel__in=channels).order_by("id").first() if message: + self.model.objects.filter(pk=message.pk).delete() return message.channel, json.loads(message.content) # If all empty, sleep for a little bit - time.sleep(0.2) + time.sleep(0.1) + def __str__(self): + return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias) diff --git a/channels/backends/memory.py b/channels/backends/memory.py index 123ec40..3b7baca 100644 --- a/channels/backends/memory.py +++ b/channels/backends/memory.py @@ -11,6 +11,8 @@ class InMemoryChannelBackend(BaseChannelBackend): in low-throughput development environments. """ + local_only = True + def send(self, channel, message): # Try JSON encoding it to make sure it would, but store the native version json.dumps(message) @@ -18,6 +20,8 @@ class InMemoryChannelBackend(BaseChannelBackend): queues.setdefault(channel, deque()).append(message) def receive_many(self, channels): + if not channels: + raise ValueError("Cannot receive on empty channel list!") while True: # Try to pop a message from each channel for channel in channels: diff --git a/channels/channel.py b/channels/channel.py index b4e887d..c1b8330 100644 --- a/channels/channel.py +++ b/channels/channel.py @@ -23,13 +23,13 @@ class Channel(object): Create an instance for the channel named "name" """ self.name = name - self.channel_layer = channel_backends[alias] + self.channel_backend = channel_backends[alias] def send(self, **kwargs): """ Send a message over the channel, taken from the kwargs. """ - self.channel_layer.send(self.name, kwargs) + self.channel_backend.send(self.name, kwargs) @classmethod def new_name(self, prefix): @@ -59,9 +59,9 @@ class Channel(object): if isinstance(channels, six.string_types): channels = [channels] # Get the channel - channel_layer = channel_backends[alias] + channel_backend = channel_backends[alias] # Return a function that'll register whatever it wraps def inner(func): - channel_layer.registry.add_consumer(func, channels) + channel_backend.registry.add_consumer(func, channels) return func return inner diff --git a/channels/management/commands/runserver.py b/channels/management/commands/runserver.py index ade88a2..0179b65 100644 --- a/channels/management/commands/runserver.py +++ b/channels/management/commands/runserver.py @@ -1,6 +1,7 @@ import django import threading from django.core.management.commands.runserver import Command as RunserverCommand +from django.core.management import CommandError from django.core.handlers.wsgi import WSGIHandler from django.http import HttpResponse from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND @@ -22,13 +23,13 @@ class Command(RunserverCommand): # Force disable reloader for now options['use_reloader'] = False # Check a handler is registered for http reqs - channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND] + channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] auto_import_consumers() - if not channel_layer.registry.consumer_for_channel("django.wsgi.request"): + if not channel_backend.registry.consumer_for_channel("django.wsgi.request"): # Register the default one - channel_layer.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) + channel_backend.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) # Launch a worker thread - worker = WorkerThread(channel_layer) + worker = WorkerThread(channel_backend) worker.daemon = True worker.start() # Run the rest @@ -52,9 +53,9 @@ class WorkerThread(threading.Thread): Class that runs a worker """ - def __init__(self, channel_layer): + def __init__(self, channel_backend): super(WorkerThread, self).__init__() - self.channel_layer = channel_layer + self.channel_backend = channel_backend def run(self): - Worker(channel_layer=self.channel_layer).run() + Worker(channel_backend=self.channel_backend).run() diff --git a/channels/management/commands/runworker.py b/channels/management/commands/runworker.py new file mode 100644 index 0000000..8d8b2ae --- /dev/null +++ b/channels/management/commands/runworker.py @@ -0,0 +1,41 @@ +import time +from wsgiref.simple_server import BaseHTTPRequestHandler +from django.core.management import BaseCommand, CommandError +from channels import channel_backends, DEFAULT_CHANNEL_BACKEND +from channels.worker import Worker +from channels.utils import auto_import_consumers + + +class Command(BaseCommand): + + def handle(self, *args, **options): + # Get the backend to use + channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + auto_import_consumers() + if channel_backend.local_only: + raise CommandError( + "You have a process-local channel backend configured, and so cannot run separate workers.\n" + "Configure a network-based backend in CHANNEL_BACKENDS to use this command." + ) + # Launch a worker + self.stdout.write("Running worker against backend %s" % channel_backend) + # Optionally provide an output callback + callback = None + if options.get("verbosity", 1) > 1: + callback = self.consumer_called + # Run the worker + try: + Worker(channel_backend=channel_backend, callback=callback).run() + except KeyboardInterrupt: + pass + + def consumer_called(self, channel, message): + self.stdout.write("[%s] %s" % (self.log_date_time_string(), channel)) + + def log_date_time_string(self): + """Return the current time formatted for logging.""" + now = time.time() + year, month, day, hh, mm, ss, x, y, z = time.localtime(now) + s = "%02d/%3s/%04d %02d:%02d:%02d" % ( + day, BaseHTTPRequestHandler.monthname[month], year, hh, mm, ss) + return s diff --git a/channels/worker.py b/channels/worker.py index bb0c669..44522cd 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -4,16 +4,19 @@ class Worker(object): and runs their consumers. """ - def __init__(self, channel_layer): - self.channel_layer = channel_layer + def __init__(self, channel_backend, callback=None): + self.channel_backend = channel_backend + self.callback = callback def run(self): """ Tries to continually dispatch messages to consumers. """ - channels = self.channel_layer.registry.all_channel_names() + channels = self.channel_backend.registry.all_channel_names() while True: - channel, message = self.channel_layer.receive_many(channels) - consumer = self.channel_layer.registry.consumer_for_channel(channel) + channel, message = self.channel_backend.receive_many(channels) + consumer = self.channel_backend.registry.consumer_for_channel(channel) + if self.callback: + self.callback(channel, message) consumer(channel=channel, **message)