From 821816f656ed8be43dc8a5ebf90fa16920355f1a Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 3 Jun 2015 18:17:46 +0100 Subject: [PATCH] Initial commit --- .gitignore | 1 + README | 0 channel/__init__.py | 7 +++ channel/adapters.py | 18 ++++++ channel/channels/__init__.py | 0 channel/channels/base.py | 53 ++++++++++++++++ channel/channels/memory.py | 48 +++++++++++++++ channel/consumer_registry.py | 40 +++++++++++++ channel/management/__init__.py | 0 channel/management/commands/__init__.py | 0 .../management/commands/runinterfaceserver.py | 60 +++++++++++++++++++ channel/request.py | 34 +++++++++++ channel/response.py | 29 +++++++++ channel/utils.py | 14 +++++ channel/worker.py | 19 ++++++ setup.py | 12 ++++ 16 files changed, 335 insertions(+) create mode 100644 .gitignore create mode 100644 README create mode 100755 channel/__init__.py create mode 100755 channel/adapters.py create mode 100644 channel/channels/__init__.py create mode 100644 channel/channels/base.py create mode 100644 channel/channels/memory.py create mode 100755 channel/consumer_registry.py create mode 100644 channel/management/__init__.py create mode 100644 channel/management/commands/__init__.py create mode 100755 channel/management/commands/runinterfaceserver.py create mode 100755 channel/request.py create mode 100755 channel/response.py create mode 100755 channel/utils.py create mode 100755 channel/worker.py create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..11041c7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.egg-info diff --git a/README b/README new file mode 100644 index 0000000..e69de29 diff --git a/channel/__init__.py b/channel/__init__.py new file mode 100755 index 0000000..d5e0e00 --- /dev/null +++ b/channel/__init__.py @@ -0,0 +1,7 @@ +from .consumer_registry import ConsumerRegistry + +# Make a site-wide registry +coreg = ConsumerRegistry() + +# Load an implementation of Channel +from .channels.memory import Channel diff --git a/channel/adapters.py b/channel/adapters.py new file mode 100755 index 0000000..7f3474e --- /dev/null +++ b/channel/adapters.py @@ -0,0 +1,18 @@ +from django.core.handlers.base import BaseHandler +from channel import Channel +from .response import encode_response +from .request import decode_request + + +class DjangoUrlAdapter(object): + """ + Adapts the channel-style HTTP requests to the URL-router/handler style + """ + + def __init__(self): + self.handler = BaseHandler() + self.handler.load_middleware() + + def __call__(self, request, response_channel): + response = self.handler.get_response(decode_request(request)) + Channel(response_channel).send(**encode_response(response)) diff --git a/channel/channels/__init__.py b/channel/channels/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channel/channels/base.py b/channel/channels/base.py new file mode 100644 index 0000000..7841d99 --- /dev/null +++ b/channel/channels/base.py @@ -0,0 +1,53 @@ +class Channel(object): + """ + Base class for all channel layer implementations. + """ + + class ClosedError(Exception): + """ + Raised when you try to send to a closed channel. + """ + pass + + def __init__(self, name): + """ + Create an instance for the channel named "name" + """ + self.name = name + + def send(self, **kwargs): + """ + Send a message over the channel, taken from the kwargs. + """ + raise NotImplementedError() + + def close(self): + """ + Closes the channel, allowing no more messages to be sent over it. + """ + raise NotImplementedError() + + @property + def closed(self): + """ + Says if the channel is closed. + """ + raise NotImplementedError() + + @classmethod + def receive_many(self, channel_names): + """ + Block and return the first message available on one of the + channels passed, as a (channel_name, message) tuple. + """ + raise NotImplementedError() + + @classmethod + def new_name(self, prefix): + """ + Returns a new channel name that's unique and not closed + with the given prefix. Does not need to be called before sending + on a channel name - just provides a way to avoid clashing for + response channels. + """ + raise NotImplementedError() diff --git a/channel/channels/memory.py b/channel/channels/memory.py new file mode 100644 index 0000000..6a66ceb --- /dev/null +++ b/channel/channels/memory.py @@ -0,0 +1,48 @@ +import time +import string +import random +from collections import deque +from .base import Channel as BaseChannel + +queues = {} +closed = set() + +class Channel(BaseChannel): + """ + In-memory channel implementation. Intended only for use with threading, + in low-throughput development environments. + """ + + def send(self, **kwargs): + # Don't allow if closed + if self.name in closed: + raise Channel.ClosedError("%s is closed" % self.name) + # Add to the deque, making it if needs be + queues.setdefault(self.name, deque()).append(kwargs) + + @property + def closed(self): + # Check closed set + return self.name in closed + + def close(self): + # Add to closed set + closed.add(self.name) + + @classmethod + def receive_many(self, channel_names): + while True: + # Try to pop a message from each channel + for channel_name in channel_names: + try: + # This doesn't clean up empty channels - OK for testing. + # For later versions, have cleanup w/lock. + return channel_name, queues[channel_name].popleft() + except (IndexError, KeyError): + pass + # If all empty, sleep for a little bit + time.sleep(0.01) + + @classmethod + def new_name(self, prefix): + return "%s.%s" % (prefix, "".join(random.choice(string.ascii_letters) for i in range(16))) diff --git a/channel/consumer_registry.py b/channel/consumer_registry.py new file mode 100755 index 0000000..0e6baaa --- /dev/null +++ b/channel/consumer_registry.py @@ -0,0 +1,40 @@ +import functools + +class ConsumerRegistry(object): + """ + Manages the available consumers in the project and which channels they + listen to. + + Generally a single project-wide instance of this is used. + """ + + def __init__(self): + self.consumers = {} + + def add_consumer(self, consumer, channels): + for channel in channels: + if channel in self.consumers: + raise ValueError("Cannot register consumer %s - channel %s already consumed by %s" % ( + consumer, + channel, + self.consumers[channel], + )) + self.consumers[channel] = consumer + + def consumer(self, channels): + """ + Decorator that registers a function as a consumer. + """ + def inner(func): + self.add_consumer(func, channels) + return func + return inner + + def all_channel_names(self): + return self.consumers.keys() + + def consumer_for_channel(self, channel): + try: + return self.consumers[channel] + except KeyError: + return None diff --git a/channel/management/__init__.py b/channel/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channel/management/commands/__init__.py b/channel/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channel/management/commands/runinterfaceserver.py b/channel/management/commands/runinterfaceserver.py new file mode 100755 index 0000000..796606d --- /dev/null +++ b/channel/management/commands/runinterfaceserver.py @@ -0,0 +1,60 @@ +import django +import threading +from django.core.management.commands.runserver import Command as RunserverCommand +from django.core.handlers.wsgi import WSGIHandler +from channel import Channel, coreg +from channel.request import encode_request +from channel.response import decode_response +from channel.worker import Worker +from channel.utils import auto_import_consumers + + +class Command(RunserverCommand): + + def get_handler(self, *args, **options): + """ + Returns the default WSGI handler for the runner. + """ + django.setup() + return WSGIInterfaceHandler() + + def run(self, *args, **options): + # Force disable reloader for now + options['use_reloader'] = False + # Check a handler is registered for http reqs + auto_import_consumers() + if not coreg.consumer_for_channel("django.wsgi.request"): + raise RuntimeError("No consumer registered for WSGI requests") + # Launch a worker thread + worker = WorkerThread() + worker.daemon = True + worker.start() + # Run the rest + return super(Command, self).run(*args, **options) + + +class WSGIInterfaceHandler(WSGIHandler): + """ + New WSGI handler that pushes requests to channels. + """ + + def get_response(self, request): + response_channel = Channel.new_name("django.wsgi.response") + Channel("django.wsgi.request").send( + request = encode_request(request), + response_channel = response_channel, + ) + channel, message = Channel.receive_many([response_channel]) + return decode_response(message) + + +class WorkerThread(threading.Thread): + """ + Class that runs a worker + """ + + def run(self): + Worker( + consumer_registry = coreg, + channel_class = Channel, + ).run() diff --git a/channel/request.py b/channel/request.py new file mode 100755 index 0000000..c319ac5 --- /dev/null +++ b/channel/request.py @@ -0,0 +1,34 @@ +from django.http import HttpRequest +from django.utils.datastructures import MultiValueDict + + +def encode_request(request): + """ + Encodes a request to JSON-compatible datastructures + """ + # TODO: More stuff + value = { + "GET": request.GET.items(), + "POST": request.POST.items(), + "COOKIES": request.COOKIES, + "META": {k: v for k, v in request.META.items() if not k.startswith("wsgi")}, + "path": request.path, + "path_info": request.path_info, + "method": request.method, + } + return value + + +def decode_request(value): + """ + Decodes a request JSONish value to a HttpRequest object. + """ + request = HttpRequest() + request.GET = MultiValueDict(value['GET']) + request.POST = MultiValueDict(value['POST']) + request.COOKIES = value['COOKIES'] + request.META = value['META'] + request.path = value['path'] + request.method = value['method'] + request.path_info = value['path_info'] + return request diff --git a/channel/response.py b/channel/response.py new file mode 100755 index 0000000..542482a --- /dev/null +++ b/channel/response.py @@ -0,0 +1,29 @@ +from django.http import HttpResponse + + +def encode_response(response): + """ + Encodes a response to JSON-compatible datastructures + """ + # TODO: Entirely useful things like cookies + value = { + "content_type": getattr(response, "content_type", None), + "content": response.content, + "status_code": response.status_code, + "headers": response._headers.values(), + } + response.close() + return value + + +def decode_response(value): + """ + Decodes a response JSONish value to a HttpResponse object. + """ + response = HttpResponse( + content = value['content'], + content_type = value['content_type'], + status = value['status_code'], + ) + response._headers = {k.lower: (k, v) for k, v in value['headers']} + return response diff --git a/channel/utils.py b/channel/utils.py new file mode 100755 index 0000000..2615e72 --- /dev/null +++ b/channel/utils.py @@ -0,0 +1,14 @@ +from django.apps import apps + + +def auto_import_consumers(): + """ + Auto-import consumers modules in apps + """ + for app_config in apps.get_app_configs(): + consumer_module_name = "%s.consumers" % (app_config.name,) + try: + __import__(consumer_module_name) + except ImportError as e: + if "no module named" not in str(e).lower(): + raise diff --git a/channel/worker.py b/channel/worker.py new file mode 100755 index 0000000..73a2c1f --- /dev/null +++ b/channel/worker.py @@ -0,0 +1,19 @@ +class Worker(object): + """ + A "worker" process that continually looks for available messages to run + and runs their consumers. + """ + + def __init__(self, consumer_registry, channel_class): + self.consumer_registry = consumer_registry + self.channel_class = channel_class + + def run(self): + """ + Tries to continually dispatch messages to consumers. + """ + channels = self.consumer_registry.all_channel_names() + while True: + channel, message = self.channel_class.receive_many(channels) + consumer = self.consumer_registry.consumer_for_channel(channel) + consumer(**message) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e8274ab --- /dev/null +++ b/setup.py @@ -0,0 +1,12 @@ +from setuptools import find_packages, setup + +setup( + name='django-channel', + version="0.1", + url='http://github.com/andrewgodwin/django-channel', + author='Andrew Godwin', + author_email='andrew@aeracode.org', + license='BSD', + packages=find_packages(), + include_package_data=True, +)