Working database backend and "runworker" command

This commit is contained in:
Andrew Godwin 2015-06-10 11:17:32 -07:00
parent 95e706f71f
commit 80627d8e37
10 changed files with 121 additions and 52 deletions

View File

@ -1,4 +1,4 @@
# Load backends # Load backends, using settings if available (else falling back to a default)
DEFAULT_CHANNEL_BACKEND = "default" DEFAULT_CHANNEL_BACKEND = "default"
from .backends import BackendManager from .backends import BackendManager
from django.conf import settings from django.conf import settings

View File

@ -47,7 +47,7 @@ def view_consumer(channel_name, alias=DEFAULT_CHANNEL_BACKEND):
response = func(request) response = func(request)
Channel(request.response_channel).send(**response.channel_encode()) Channel(request.response_channel).send(**response.channel_encode())
# Get the channel layer and register # Get the channel layer and register
channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND] channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
channel_layer.registry.add_consumer(consumer, [channel_name]) channel_backend.registry.add_consumer(consumer, [channel_name])
return func return func
return inner return inner

View File

@ -11,17 +11,23 @@ class BackendManager(object):
""" """
def __init__(self, backend_configs): def __init__(self, backend_configs):
self.configs = backend_configs
self.backends = {} self.backends = {}
for name, config in backend_configs.items():
# Load the backend class def make_backend(self, name):
try: # Load the backend class
backend_class = import_string(config['BACKEND']) try:
except KeyError: backend_class = import_string(self.configs[name]['BACKEND'])
raise InvalidChannelBackendError("No BACKEND specified for %s" % name) except KeyError:
except ImportError: raise InvalidChannelBackendError("No BACKEND specified for %s" % name)
raise InvalidChannelBackendError("Cannot import BACKEND %s specified for %s" % (config['BACKEND'], name)) except ImportError as e:
# Initialise and pass config raise InvalidChannelBackendError("Cannot import BACKEND %r specified for %s" % (self.configs[name]['BACKEND'], name))
self.backends[name] = backend_class(**{k.lower(): v for k, v in config.items() if k != "BACKEND"}) # Initialise and pass config
instance = backend_class(**{k.lower(): v for k, v in self.configs[name].items() if k != "BACKEND"})
instance.alias = name
return instance
def __getitem__(self, key): def __getitem__(self, key):
if key not in self.backends:
self.backends[key] = self.make_backend(key)
return self.backends[key] return self.backends[key]

View File

@ -15,6 +15,10 @@ class BaseChannelBackend(object):
registry of consumers. registry of consumers.
""" """
# Flags if this backend can only be used inside one process.
# Causes errors if you try to run workers/interfaces separately with it.
local_only = False
def __init__(self, expiry=60): def __init__(self, expiry=60):
self.registry = ConsumerRegistry() self.registry = ConsumerRegistry()
self.expiry = expiry self.expiry = expiry
@ -31,3 +35,6 @@ class BaseChannelBackend(object):
channels passed, as a (channel, message) tuple. channels passed, as a (channel, message) tuple.
""" """
raise NotImplementedError() raise NotImplementedError()
def __str__(self):
return self.__class__.__name__

View File

@ -1,30 +1,40 @@
import time import time
import json
import datetime import datetime
from django.apps.registry import Apps from django.apps.registry import Apps
from django.db import models, connections, DEFAULT_DB_ALIAS from django.db import models, connections, DEFAULT_DB_ALIAS
from django.utils.functional import cached_property
from django.utils.timezone import now
from .base import BaseChannelBackend from .base import BaseChannelBackend
queues = {} queues = {}
class ORMChannelBackend(BaseChannelBackend): class DatabaseChannelBackend(BaseChannelBackend):
""" """
ORM-backed channel environment. For development use only; it will span ORM-backed channel environment. For development use only; it will span
multiple processes fine, but it's going to be pretty bad at throughput. multiple processes fine, but it's going to be pretty bad at throughput.
""" """
def __init__(self, expiry, db_alias=DEFAULT_DB_ALIAS): def __init__(self, expiry=60, db_alias=DEFAULT_DB_ALIAS):
super(ORMChannelBackend, self).__init__(expiry) super(DatabaseChannelBackend, self).__init__(expiry)
self.connection = connections[db_alias] self.db_alias = db_alias
self.model = self.make_model()
self.ensure_schema()
def make_model(self): @property
def connection(self):
"""
Returns the correct connection for the current thread.
"""
return connections[self.db_alias]
@property
def model(self):
""" """
Initialises a new model to store messages; not done as part of a Initialises a new model to store messages; not done as part of a
models.py as we don't want to make it for most installs. models.py as we don't want to make it for most installs.
""" """
# Make the model class
class Message(models.Model): class Message(models.Model):
# We assume an autoincrementing PK for message order # We assume an autoincrementing PK for message order
channel = models.CharField(max_length=200, db_index=True) channel = models.CharField(max_length=200, db_index=True)
@ -34,35 +44,32 @@ class ORMChannelBackend(BaseChannelBackend):
apps = Apps() apps = Apps()
app_label = "channels" app_label = "channels"
db_table = "django_channels" db_table = "django_channels"
# Ensure its table exists
if Message._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()):
with self.connection.schema_editor() as editor:
editor.create_model(Message)
return Message return Message
def ensure_schema(self):
"""
Ensures the table exists and has the correct schema.
"""
# If the table's there, that's fine - we've never changed its schema
# in the codebase.
if self.model._meta.db_table in self.connection.introspection.table_names(self.connection.cursor()):
return
# Make the table
with self.connection.schema_editor() as editor:
editor.create_model(self.model)
def send(self, channel, message): def send(self, channel, message):
self.model.objects.create( self.model.objects.create(
channel = channel, channel = channel,
message = json.dumps(message), content = json.dumps(message),
expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.expiry) expiry = now() + datetime.timedelta(seconds=self.expiry)
) )
def receive_many(self, channels): def receive_many(self, channels):
if not channels:
raise ValueError("Cannot receive on empty channel list!")
while True: while True:
# Delete all expired messages (add 10 second grace period for clock sync) # Delete all expired messages (add 10 second grace period for clock sync)
self.model.objects.filter(expiry__lt=datetime.datetime.utcnow() - datetime.timedelta(seconds=10)).delete() self.model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete()
# Get a message from one of our channels # Get a message from one of our channels
message = self.model.objects.filter(channel__in=channels).order_by("id").first() message = self.model.objects.filter(channel__in=channels).order_by("id").first()
if message: if message:
self.model.objects.filter(pk=message.pk).delete()
return message.channel, json.loads(message.content) return message.channel, json.loads(message.content)
# If all empty, sleep for a little bit # If all empty, sleep for a little bit
time.sleep(0.2) time.sleep(0.1)
def __str__(self):
return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias)

View File

@ -11,6 +11,8 @@ class InMemoryChannelBackend(BaseChannelBackend):
in low-throughput development environments. in low-throughput development environments.
""" """
local_only = True
def send(self, channel, message): def send(self, channel, message):
# Try JSON encoding it to make sure it would, but store the native version # Try JSON encoding it to make sure it would, but store the native version
json.dumps(message) json.dumps(message)
@ -18,6 +20,8 @@ class InMemoryChannelBackend(BaseChannelBackend):
queues.setdefault(channel, deque()).append(message) queues.setdefault(channel, deque()).append(message)
def receive_many(self, channels): def receive_many(self, channels):
if not channels:
raise ValueError("Cannot receive on empty channel list!")
while True: while True:
# Try to pop a message from each channel # Try to pop a message from each channel
for channel in channels: for channel in channels:

View File

@ -23,13 +23,13 @@ class Channel(object):
Create an instance for the channel named "name" Create an instance for the channel named "name"
""" """
self.name = name self.name = name
self.channel_layer = channel_backends[alias] self.channel_backend = channel_backends[alias]
def send(self, **kwargs): def send(self, **kwargs):
""" """
Send a message over the channel, taken from the kwargs. Send a message over the channel, taken from the kwargs.
""" """
self.channel_layer.send(self.name, kwargs) self.channel_backend.send(self.name, kwargs)
@classmethod @classmethod
def new_name(self, prefix): def new_name(self, prefix):
@ -59,9 +59,9 @@ class Channel(object):
if isinstance(channels, six.string_types): if isinstance(channels, six.string_types):
channels = [channels] channels = [channels]
# Get the channel # Get the channel
channel_layer = channel_backends[alias] channel_backend = channel_backends[alias]
# Return a function that'll register whatever it wraps # Return a function that'll register whatever it wraps
def inner(func): def inner(func):
channel_layer.registry.add_consumer(func, channels) channel_backend.registry.add_consumer(func, channels)
return func return func
return inner return inner

View File

@ -1,6 +1,7 @@
import django import django
import threading import threading
from django.core.management.commands.runserver import Command as RunserverCommand from django.core.management.commands.runserver import Command as RunserverCommand
from django.core.management import CommandError
from django.core.handlers.wsgi import WSGIHandler from django.core.handlers.wsgi import WSGIHandler
from django.http import HttpResponse from django.http import HttpResponse
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
@ -22,13 +23,13 @@ class Command(RunserverCommand):
# Force disable reloader for now # Force disable reloader for now
options['use_reloader'] = False options['use_reloader'] = False
# Check a handler is registered for http reqs # Check a handler is registered for http reqs
channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND] channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
auto_import_consumers() auto_import_consumers()
if not channel_layer.registry.consumer_for_channel("django.wsgi.request"): if not channel_backend.registry.consumer_for_channel("django.wsgi.request"):
# Register the default one # Register the default one
channel_layer.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) channel_backend.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"])
# Launch a worker thread # Launch a worker thread
worker = WorkerThread(channel_layer) worker = WorkerThread(channel_backend)
worker.daemon = True worker.daemon = True
worker.start() worker.start()
# Run the rest # Run the rest
@ -52,9 +53,9 @@ class WorkerThread(threading.Thread):
Class that runs a worker Class that runs a worker
""" """
def __init__(self, channel_layer): def __init__(self, channel_backend):
super(WorkerThread, self).__init__() super(WorkerThread, self).__init__()
self.channel_layer = channel_layer self.channel_backend = channel_backend
def run(self): def run(self):
Worker(channel_layer=self.channel_layer).run() Worker(channel_backend=self.channel_backend).run()

View File

@ -0,0 +1,41 @@
import time
from wsgiref.simple_server import BaseHTTPRequestHandler
from django.core.management import BaseCommand, CommandError
from channels import channel_backends, DEFAULT_CHANNEL_BACKEND
from channels.worker import Worker
from channels.utils import auto_import_consumers
class Command(BaseCommand):
def handle(self, *args, **options):
# Get the backend to use
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
auto_import_consumers()
if channel_backend.local_only:
raise CommandError(
"You have a process-local channel backend configured, and so cannot run separate workers.\n"
"Configure a network-based backend in CHANNEL_BACKENDS to use this command."
)
# Launch a worker
self.stdout.write("Running worker against backend %s" % channel_backend)
# Optionally provide an output callback
callback = None
if options.get("verbosity", 1) > 1:
callback = self.consumer_called
# Run the worker
try:
Worker(channel_backend=channel_backend, callback=callback).run()
except KeyboardInterrupt:
pass
def consumer_called(self, channel, message):
self.stdout.write("[%s] %s" % (self.log_date_time_string(), channel))
def log_date_time_string(self):
"""Return the current time formatted for logging."""
now = time.time()
year, month, day, hh, mm, ss, x, y, z = time.localtime(now)
s = "%02d/%3s/%04d %02d:%02d:%02d" % (
day, BaseHTTPRequestHandler.monthname[month], year, hh, mm, ss)
return s

View File

@ -4,16 +4,19 @@ class Worker(object):
and runs their consumers. and runs their consumers.
""" """
def __init__(self, channel_layer): def __init__(self, channel_backend, callback=None):
self.channel_layer = channel_layer self.channel_backend = channel_backend
self.callback = callback
def run(self): def run(self):
""" """
Tries to continually dispatch messages to consumers. Tries to continually dispatch messages to consumers.
""" """
channels = self.channel_layer.registry.all_channel_names() channels = self.channel_backend.registry.all_channel_names()
while True: while True:
channel, message = self.channel_layer.receive_many(channels) channel, message = self.channel_backend.receive_many(channels)
consumer = self.channel_layer.registry.consumer_for_channel(channel) consumer = self.channel_backend.registry.consumer_for_channel(channel)
if self.callback:
self.callback(channel, message)
consumer(channel=channel, **message) consumer(channel=channel, **message)