diff --git a/README.rst b/README.rst index 9a174fc..c7d40ae 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,10 @@ Django Channels =============== +**NOTE: The current master branch is in flux as it changes to match the final +structure and the new ASGI spec. If you wish to use this in the meantime, +please use a tagged release.** + This is a work-in-progress code branch of Django implemented as a third-party app, which aims to bring some asynchrony to Django and expand the options for code beyond the request-response model, in particular enabling WebSocket, diff --git a/channels/backends/__init__.py b/channels/backends/__init__.py index e52d24f..e69de29 100644 --- a/channels/backends/__init__.py +++ b/channels/backends/__init__.py @@ -1,36 +0,0 @@ -from django.utils.module_loading import import_string - - -class InvalidChannelBackendError(ValueError): - pass - - -class BackendManager(object): - """ - Takes a settings dictionary of backends and initialises them. - """ - - def __init__(self, backend_configs): - self.configs = backend_configs - self.backends = {} - - def make_backend(self, name): - # Load the backend class - try: - backend_class = import_string(self.configs[name]['BACKEND']) - except KeyError: - raise InvalidChannelBackendError("No BACKEND specified for %s" % name) - except ImportError: - raise InvalidChannelBackendError( - "Cannot import BACKEND %r specified for %s" % (self.configs[name]['BACKEND'], name) - ) - - # Initialise and pass config - instance = backend_class(**{k.lower(): v for k, v in self.configs[name].items() if k != "BACKEND"}) - instance.alias = name - return instance - - def __getitem__(self, key): - if key not in self.backends: - self.backends[key] = self.make_backend(key) - return self.backends[key] diff --git a/channels/backends/base.py b/channels/backends/base.py deleted file mode 100644 index fbf9c5f..0000000 --- a/channels/backends/base.py +++ /dev/null @@ -1,109 +0,0 @@ -import time - -from channels.consumer_registry import ConsumerRegistry - - -class ChannelClosed(Exception): - """ - Raised when you try to send to a closed channel. - """ - pass - - -class BaseChannelBackend(object): - """ - Base class for all channel layer implementations. Manages both sending - and receving messages from the backend, and each comes with its own - registry of consumers. - """ - - # Flags if this backend can only be used inside one process. - # Causes errors if you try to run workers/interfaces separately with it. - local_only = False - - def __init__(self, routing, expiry=60): - self.registry = ConsumerRegistry(routing) - self.expiry = expiry - - def send(self, channel, message): - """ - Send a message over the channel, taken from the kwargs. - """ - raise NotImplementedError() - - def receive_many(self, channels): - """ - Return the first message available on one of the - channels passed, as a (channel, message) tuple, or return (None, None) - if no channels are available. - - Should not block, but is allowed to be moderately slow/have a short - timeout - it needs to return so we can refresh the list of channels, - not because the rest of the process is waiting on it. - - Better performance can be achieved for interface servers by directly - integrating the server and the backend code; this is merely for a - generic support-everything pattern. - """ - raise NotImplementedError() - - def receive_many_blocking(self, channels): - """ - Blocking version of receive_many, if the calling context knows it - doesn't ever want to change the channels list until something happens. - - This base class provides a default implementation; can be overridden - to be more efficient by subclasses. - """ - while True: - channel, message = self.receive_many(channels) - if channel is None: - time.sleep(0.05) - continue - return channel, message - - def group_add(self, group, channel, expiry=None): - """ - Adds the channel to the named group for at least 'expiry' - seconds (expiry defaults to message expiry if not provided). - """ - raise NotImplementedError() - - def group_discard(self, group, channel): - """ - Removes the channel from the named group if it is in the group; - does nothing otherwise (does not error) - """ - raise NotImplementedError() - - def group_channels(self, group): - """ - Returns an iterable of all channels in the group. - """ - raise NotImplementedError() - - def send_group(self, group, message): - """ - Sends a message to the entire group. - - This base class provides a default implementation; can be overridden - to be more efficient by subclasses. - """ - for channel in self.group_channels(group): - self.send(channel, message) - - def __str__(self): - return self.__class__.__name__ - - def lock_channel(self, channel): - """ - Attempts to get a lock on the named channel. Returns True if lock - obtained, False if lock not obtained. - """ - raise NotImplementedError() - - def unlock_channel(self, channel): - """ - Unlocks the named channel. Always succeeds. - """ - raise NotImplementedError() diff --git a/channels/backends/memory.py b/channels/backends/memory.py deleted file mode 100644 index 0cb28ba..0000000 --- a/channels/backends/memory.py +++ /dev/null @@ -1,102 +0,0 @@ -import json -import time -from collections import deque - -from .base import BaseChannelBackend - -queues = {} -groups = {} -locks = set() - - -class InMemoryChannelBackend(BaseChannelBackend): - """ - In-memory channel implementation. Intended only for use with threading, - in low-throughput development environments. - """ - - local_only = True - - def send(self, channel, message): - # Try JSON encoding it to make sure it would, but store the native version - json.dumps(message) - # Add to the deque, making it if needs be - queues.setdefault(channel, deque()).append((message, time.time() + self.expiry)) - - def receive_many(self, channels): - if not channels: - raise ValueError("Cannot receive on empty channel list!") - # Try to pop a message from each channel - self._clean_expired() - for channel in channels: - try: - # This doesn't clean up empty channels - OK for testing. - # For later versions, have cleanup w/lock. - return channel, queues[channel].popleft()[0] - except (IndexError, KeyError): - pass - return None, None - - def _clean_expired(self): - # Handle expired messages - for channel, messages in queues.items(): - while len(messages) and messages[0][1] < time.time(): - messages.popleft() - # Handle expired groups - for group, channels in list(groups.items()): - for channel, expiry in list(channels.items()): - if expiry < (time.time() - 10): - try: - del groups[group][channel] - except KeyError: - # Another thread might have got there first - pass - - def group_add(self, group, channel, expiry=None): - """ - Adds the channel to the named group for at least 'expiry' - seconds (expiry defaults to message expiry if not provided). - """ - groups.setdefault(group, {})[channel] = time.time() + (expiry or self.expiry) - - def group_discard(self, group, channel): - """ - Removes the channel from the named group if it is in the group; - does nothing otherwise (does not error) - """ - try: - del groups[group][channel] - except KeyError: - pass - - def group_channels(self, group): - """ - Returns an iterable of all channels in the group. - """ - self._clean_expired() - return groups.get(group, {}).keys() - - def lock_channel(self, channel): - """ - Attempts to get a lock on the named channel. Returns True if lock - obtained, False if lock not obtained. - """ - # Probably not perfect for race conditions, but close enough considering - # it shouldn't be used. - if channel not in locks: - locks.add(channel) - return True - else: - return False - - def unlock_channel(self, channel): - """ - Unlocks the named channel. Always succeeds. - """ - locks.discard(channel) - - def flush(self): - global queues, groups, locks - queues = {} - groups = {} - locks = set() diff --git a/channels/backends/redis_py.py b/channels/backends/redis_py.py deleted file mode 100644 index 3ef5c07..0000000 --- a/channels/backends/redis_py.py +++ /dev/null @@ -1,192 +0,0 @@ -import binascii -import json -import math -import random -import time -import uuid - -import redis -from django.utils import six - -from .base import BaseChannelBackend - - -class RedisChannelBackend(BaseChannelBackend): - """ - ORM-backed channel environment. For development use only; it will span - multiple processes fine, but it's going to be pretty bad at throughput. - """ - - def __init__(self, routing, expiry=60, hosts=None, prefix="django-channels:"): - super(RedisChannelBackend, self).__init__(routing=routing, expiry=expiry) - # Make sure they provided some hosts, or provide a default - if not hosts: - hosts = [("localhost", 6379)] - self.hosts = [] - for entry in hosts: - if isinstance(entry, six.string_types): - self.hosts.append(entry) - else: - self.hosts.append("redis://%s:%d/0" % (entry[0],entry[1])) - self.prefix = prefix - # Precalculate some values for ring selection - self.ring_size = len(self.hosts) - self.ring_divisor = int(math.ceil(4096 / float(self.ring_size))) - - def consistent_hash(self, value): - """ - Maps the value to a node value between 0 and 4095 - using MD5, then down to one of the ring nodes. - """ - if isinstance(value, six.text_type): - value = value.encode("utf8") - bigval = binascii.crc32(value) & 0xffffffff - return (bigval // 0x100000) // self.ring_divisor - - def random_index(self): - return random.randint(0, len(self.hosts) - 1) - - def connection(self, index): - """ - Returns the correct connection for the current thread. - - Pass key to use a server based on consistent hashing of the key value; - pass None to use a random server instead. - """ - # If index is explicitly None, pick a random server - if index is None: - index = self.random_index() - # Catch bad indexes - if not (0 <= index < self.ring_size): - raise ValueError("There are only %s hosts - you asked for %s!" % (self.ring_size, index)) - return redis.Redis.from_url(self.hosts[index]) - - def send(self, channel, message): - # if channel is no str (=> bytes) convert it - if not isinstance(channel, str): - channel = channel.decode("utf-8") - # Write out message into expiring key (avoids big items in list) - # TODO: Use extended set, drop support for older redis? - key = self.prefix + uuid.uuid4().hex - - # Pick a connection to the right server - consistent for response - # channels, random for normal channels - if channel.startswith("!"): - index = self.consistent_hash(key) - connection = self.connection(index) - else: - connection = self.connection(None) - - connection.set( - key, - json.dumps(message), - ) - connection.expire( - key, - self.expiry + 10, - ) - # Add key to list - connection.rpush( - self.prefix + channel, - key, - ) - # Set list to expire when message does (any later messages will bump this) - connection.expire( - self.prefix + channel, - self.expiry + 10, - ) - # TODO: Prune expired messages from same list (in case nobody consumes) - - def receive_many(self, channels): - if not channels: - raise ValueError("Cannot receive on empty channel list!") - # Work out what servers to listen on for the given channels - indexes = {} - random_index = self.random_index() - for channel in channels: - if channel.startswith("!"): - indexes.setdefault(self.consistent_hash(channel), []).append(channel) - else: - indexes.setdefault(random_index, []).append(channel) - # Get a message from one of our channels - while True: - # Select a random connection to use - # TODO: Would we be better trying to do this truly async? - index = random.choice(list(indexes.keys())) - connection = self.connection(index) - channels = indexes[index] - # Shuffle channels to avoid the first ones starving others of workers - random.shuffle(channels) - # Pop off any waiting message - result = connection.blpop([self.prefix + channel for channel in channels], timeout=1) - if result: - content = connection.get(result[1]) - if content is None: - continue - return result[0][len(self.prefix):].decode("utf-8"), json.loads(content.decode("utf-8")) - else: - return None, None - - def group_add(self, group, channel, expiry=None): - """ - Adds the channel to the named group for at least 'expiry' - seconds (expiry defaults to message expiry if not provided). - """ - key = "%s:group:%s" % (self.prefix, group) - key = key.encode("utf8") - self.connection(self.consistent_hash(group)).zadd( - key, - **{channel: time.time() + (expiry or self.expiry)} - ) - - def group_discard(self, group, channel): - """ - Removes the channel from the named group if it is in the group; - does nothing otherwise (does not error) - """ - key = "%s:group:%s" % (self.prefix, group) - key = key.encode("utf8") - self.connection(self.consistent_hash(group)).zrem( - key, - channel, - ) - - def group_channels(self, group): - """ - Returns an iterable of all channels in the group. - """ - key = "%s:group:%s" % (self.prefix, group) - key = key.encode("utf8") - connection = self.connection(self.consistent_hash(group)) - # Discard old channels - connection.zremrangebyscore(key, 0, int(time.time()) - 10) - # Return current lot - return [x.decode("utf8") for x in connection.zrange( - key, - 0, - -1, - )] - - # TODO: send_group efficient implementation using Lua - - def lock_channel(self, channel, expiry=None): - """ - Attempts to get a lock on the named channel. Returns True if lock - obtained, False if lock not obtained. - """ - key = "%s:lock:%s" % (self.prefix, channel) - return bool(self.connection(self.consistent_hash(channel)).setnx(key, "1")) - - def unlock_channel(self, channel): - """ - Unlocks the named channel. Always succeeds. - """ - key = "%s:lock:%s" % (self.prefix, channel) - self.connection(self.consistent_hash(channel)).delete(key) - - def __str__(self): - return "%s(hosts=%s)" % (self.__class__.__name__, self.hosts) - - def flush(self): - for i in range(self.ring_size): - self.connection(i).flushdb() diff --git a/channels/interfaces/__init__.py b/channels/interfaces/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/channels/interfaces/http_twisted.py b/channels/interfaces/http_twisted.py deleted file mode 100644 index fcee728..0000000 --- a/channels/interfaces/http_twisted.py +++ /dev/null @@ -1,249 +0,0 @@ -import time - -from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory -from twisted.python.compat import _PY3 -from twisted.web.http import HTTPFactory, HTTPChannel, Request, _respondToBadRequestAndDisconnect, parse_qs, _parseHeader -from twisted.protocols.policies import ProtocolWrapper -from twisted.internet import reactor - -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND -from .websocket_autobahn import get_protocol, get_factory - - -WebsocketProtocol = get_protocol(WebSocketServerProtocol) - - -class WebRequest(Request): - """ - Request that either hands off information to channels, or offloads - to a WebSocket class. - - Does some extra processing over the normal Twisted Web request to separate - GET and POST out. - """ - - def __init__(self, *args, **kwargs): - Request.__init__(self, *args, **kwargs) - self.reply_channel = Channel.new_name("!http.response") - self.channel.factory.reply_protocols[self.reply_channel] = self - - def process(self): - # Get upgrade header - upgrade_header = None - if self.requestHeaders.hasHeader("Upgrade"): - upgrade_header = self.requestHeaders.getRawHeaders("Upgrade")[0] - # Is it WebSocket? IS IT?! - if upgrade_header == "websocket": - # Make WebSocket protocol to hand off to - protocol = self.channel.factory.ws_factory.buildProtocol(self.transport.getPeer()) - if not protocol: - # If protocol creation fails, we signal "internal server error" - self.setResponseCode(500) - self.finish() - # Port across transport - transport, self.transport = self.transport, None - if isinstance(transport, ProtocolWrapper): - # i.e. TLS is a wrapping protocol - transport.wrappedProtocol = protocol - else: - transport.protocol = protocol - protocol.makeConnection(transport) - # Re-inject request - if _PY3: - data = self.method + b' ' + self.uri + b' HTTP/1.1\x0d\x0a' - for h in self.requestHeaders.getAllRawHeaders(): - data += h[0] + b': ' + b",".join(h[1]) + b'\x0d\x0a' - data += b"\x0d\x0a" - data += self.content.read() - else: - data = "%s %s HTTP/1.1\x0d\x0a" % (self.method, self.uri) - for h in self.requestHeaders.getAllRawHeaders(): - data += "%s: %s\x0d\x0a" % (h[0], ",".join(h[1])) - data += "\x0d\x0a" - protocol.dataReceived(data) - # Remove our HTTP reply channel association - self.channel.factory.reply_protocols[self.reply_channel] = None - self.reply_channel = None - # Boring old HTTP. - else: - # Send request message - Channel("http.request").send({ - "reply_channel": self.reply_channel, - "method": self.method, - "get": self.get, - "post": self.post, - "cookies": self.received_cookies, - "headers": {k: v[0] for k, v in self.requestHeaders.getAllRawHeaders()}, - "client": [self.client.host, self.client.port], - "server": [self.host.host, self.host.port], - "path": self.path, - }) - - def connectionLost(self, reason): - """ - Cleans up reply channel on close. - """ - if self.reply_channel: - del self.channel.factory.reply_protocols[self.reply_channel] - Request.connectionLost(self, reason) - - def serverResponse(self, message): - """ - Writes a received HTTP response back out to the transport. - """ - # Write code - self.setResponseCode(message['status']) - # Write headers - for header, value in message.get("headers", {}): - self.setHeader(header.encode("utf8"), value.encode("utf8")) - # Write cookies - for cookie in message.get("cookies"): - self.cookies.append(cookie.encode("utf8")) - # Write out body - if "content" in message: - Request.write(self, message['content'].encode("utf8")) - self.finish() - - def requestReceived(self, command, path, version): - """ - Called by channel when all data has been received. - Overridden because Twisted merges GET and POST into one thing by default. - """ - self.content.seek(0,0) - self.get = {} - self.post = {} - - self.method, self.uri = command, path - self.clientproto = version - x = self.uri.split(b'?', 1) - - print self.method - - # URI and GET args assignment - if len(x) == 1: - self.path = self.uri - else: - self.path, argstring = x - self.get = parse_qs(argstring, 1) - - # cache the client and server information, we'll need this later to be - # serialized and sent with the request so CGIs will work remotely - self.client = self.channel.transport.getPeer() - self.host = self.channel.transport.getHost() - - # Argument processing - ctype = self.requestHeaders.getRawHeaders(b'content-type') - if ctype is not None: - ctype = ctype[0] - - # Process POST data if present - if self.method == b"POST" and ctype: - mfd = b'multipart/form-data' - key, pdict = _parseHeader(ctype) - if key == b'application/x-www-form-urlencoded': - self.post.update(parse_qs(self.content.read(), 1)) - elif key == mfd: - try: - cgiArgs = cgi.parse_multipart(self.content, pdict) - - if _PY3: - # parse_multipart on Python 3 decodes the header bytes - # as iso-8859-1 and returns a str key -- we want bytes - # so encode it back - self.post.update({x.encode('iso-8859-1'): y - for x, y in cgiArgs.items()}) - else: - self.post.update(cgiArgs) - except: - # It was a bad request. - _respondToBadRequestAndDisconnect(self.channel.transport) - return - self.content.seek(0, 0) - - # Continue with rest of request handling - self.process() - - -class WebProtocol(HTTPChannel): - - requestFactory = WebRequest - - -class WebFactory(HTTPFactory): - - protocol = WebProtocol - - def __init__(self): - HTTPFactory.__init__(self) - # We track all sub-protocols for response channel mapping - self.reply_protocols = {} - # Make a factory for WebSocket protocols - self.ws_factory = WebSocketServerFactory("ws://127.0.0.1:8000") - self.ws_factory.protocol = WebsocketProtocol - self.ws_factory.reply_protocols = self.reply_protocols - - def reply_channels(self): - return self.reply_protocols.keys() - - def dispatch_reply(self, channel, message): - if channel.startswith("!http") and isinstance(self.reply_protocols[channel], WebRequest): - self.reply_protocols[channel].serverResponse(message) - elif channel.startswith("!websocket") and isinstance(self.reply_protocols[channel], WebsocketProtocol): - if message.get("content", None): - self.reply_protocols[channel].serverSend(**message) - if message.get("close", False): - self.reply_protocols[channel].serverClose() - else: - raise ValueError("Cannot dispatch message on channel %r" % channel) - - -class HttpTwistedInterface(object): - """ - Easy API to run a HTTP1 & WebSocket interface server using Twisted. - Integrates the channel backend by running it in a separate thread, using - the always-compatible polling style. - """ - - def __init__(self, channel_backend, port=8000): - self.channel_backend = channel_backend - self.port = port - - def run(self): - self.factory = WebFactory() - reactor.listenTCP(self.port, self.factory) - reactor.callInThread(self.backend_reader) - #reactor.callLater(1, self.keepalive_sender) - reactor.run() - - def backend_reader(self): - """ - Run in a separate thread; reads messages from the backend. - """ - while True: - channels = self.factory.reply_channels() - # Quit if reactor is stopping - if not reactor.running: - return - # Don't do anything if there's no channels to listen on - if channels: - channel, message = self.channel_backend.receive_many(channels) - else: - time.sleep(0.1) - continue - # Wait around if there's nothing received - if channel is None: - time.sleep(0.05) - continue - # Deal with the message - self.factory.dispatch_reply(channel, message) - - def keepalive_sender(self): - """ - Sends keepalive messages for open WebSockets every - (channel_backend expiry / 2) seconds. - """ - expiry_window = int(self.channel_backend.expiry / 2) - for protocol in self.factory.reply_protocols.values(): - if time.time() - protocol.last_keepalive > expiry_window: - protocol.sendKeepalive() - reactor.callLater(1, self.keepalive_sender) diff --git a/channels/interfaces/websocket_asyncio.py b/channels/interfaces/websocket_asyncio.py deleted file mode 100644 index 043c7d2..0000000 --- a/channels/interfaces/websocket_asyncio.py +++ /dev/null @@ -1,72 +0,0 @@ -import time - -import asyncio -from autobahn.asyncio.websocket import ( - WebSocketServerFactory, WebSocketServerProtocol, -) - -from .websocket_autobahn import get_factory, get_protocol - - -class WebsocketAsyncioInterface(object): - """ - Easy API to run a WebSocket interface server using Twisted. - Integrates the channel backend by running it in a separate thread, using - the always-compatible polling style. - """ - - def __init__(self, channel_backend, port=9000): - self.channel_backend = channel_backend - self.port = port - - def run(self): - self.factory = get_factory(WebSocketServerFactory)(debug=False) - self.factory.protocol = get_protocol(WebSocketServerProtocol) - self.loop = asyncio.get_event_loop() - coro = self.loop.create_server(self.factory, '0.0.0.0', self.port) - server = self.loop.run_until_complete(coro) - self.loop.run_in_executor(None, self.backend_reader) - self.loop.call_later(1, self.keepalive_sender) - try: - self.loop.run_forever() - except KeyboardInterrupt: - pass - finally: - server.close() - self.loop.close() - - def backend_reader(self): - """ - Run in a separate thread; reads messages from the backend. - """ - # Wait for main loop to start - time.sleep(0.5) - while True: - channels = self.factory.reply_channels() - # Quit if reactor is stopping - if not self.loop.is_running(): - return - # Don't do anything if there's no channels to listen on - if channels: - channel, message = self.channel_backend.receive_many(channels) - else: - time.sleep(0.1) - continue - # Wait around if there's nothing received - if channel is None: - time.sleep(0.05) - continue - # Deal with the message - self.factory.dispatch_send(channel, message) - - def keepalive_sender(self): - """ - Sends keepalive messages for open WebSockets every - (channel_backend expiry / 2) seconds. - """ - expiry_window = int(self.channel_backend.expiry / 2) - for protocol in self.factory.reply_protocols.values(): - if time.time() - protocol.last_keepalive > expiry_window: - protocol.sendKeepalive() - if self.loop.is_running(): - self.loop.call_later(1, self.keepalive_sender) diff --git a/channels/interfaces/websocket_autobahn.py b/channels/interfaces/websocket_autobahn.py deleted file mode 100644 index cd3dd3e..0000000 --- a/channels/interfaces/websocket_autobahn.py +++ /dev/null @@ -1,104 +0,0 @@ -import time - -from django.http import parse_cookie - -from channels import DEFAULT_CHANNEL_BACKEND, Channel, channel_backends - - -def get_protocol(base): - - class InterfaceProtocol(base): - """ - Protocol which supports WebSockets and forwards incoming messages to - the websocket channels. - """ - - def onConnect(self, request): - self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - self.request_info = { - "path": request.path, - "get": request.params, - "cookies": parse_cookie(request.headers.get('cookie', '')) - } - - def onOpen(self): - # Make sending channel - self.reply_channel = Channel.new_name("!websocket.send") - self.request_info["reply_channel"] = self.reply_channel - self.last_keepalive = time.time() - self.factory.reply_protocols[self.reply_channel] = self - # Send news that this channel is open - Channel("websocket.connect").send(self.request_info) - - def onMessage(self, payload, isBinary): - if isBinary: - Channel("websocket.receive").send({ - "reply_channel": self.reply_channel, - "content": payload, - "binary": True, - }) - else: - Channel("websocket.receive").send({ - "reply_channel": self.reply_channel, - "content": payload.decode("utf8"), - "binary": False, - }) - - def serverSend(self, content, binary=False, **kwargs): - """ - Server-side channel message to send a message. - """ - if binary: - self.sendMessage(content, binary) - else: - self.sendMessage(content.encode("utf8"), binary) - - def serverClose(self): - """ - Server-side channel message to close the socket - """ - self.sendClose() - - def onClose(self, wasClean, code, reason): - if hasattr(self, "reply_channel"): - del self.factory.reply_protocols[self.reply_channel] - Channel("websocket.disconnect").send({ - "reply_channel": self.reply_channel, - }) - - def sendKeepalive(self): - """ - Sends a keepalive packet on the keepalive channel. - """ - Channel("websocket.keepalive").send({ - "reply_channel": self.reply_channel, - }) - self.last_keepalive = time.time() - - return InterfaceProtocol - - -def get_factory(base): - - class InterfaceFactory(base): - """ - Factory which keeps track of its open protocols' receive channels - and can dispatch to them. - """ - - # TODO: Clean up dead protocols if needed? - - def __init__(self, *args, **kwargs): - super(InterfaceFactory, self).__init__(*args, **kwargs) - self.reply_protocols = {} - - def reply_channels(self): - return self.reply_protocols.keys() - - def dispatch_send(self, channel, message): - if message.get("content", None): - self.reply_protocols[channel].serverSend(**message) - if message.get("close", False): - self.reply_protocols[channel].serverClose() - - return InterfaceFactory diff --git a/channels/interfaces/websocket_twisted.py b/channels/interfaces/websocket_twisted.py deleted file mode 100644 index 1a924ac..0000000 --- a/channels/interfaces/websocket_twisted.py +++ /dev/null @@ -1,61 +0,0 @@ -import time - -from autobahn.twisted.websocket import ( - WebSocketServerFactory, WebSocketServerProtocol, -) -from twisted.internet import reactor - -from .websocket_autobahn import get_factory, get_protocol - - -class WebsocketTwistedInterface(object): - """ - Easy API to run a WebSocket interface server using Twisted. - Integrates the channel backend by running it in a separate thread, using - the always-compatible polling style. - """ - - def __init__(self, channel_backend, port=9000): - self.channel_backend = channel_backend - self.port = port - - def run(self): - self.factory = get_factory(WebSocketServerFactory)(debug=False) - self.factory.protocol = get_protocol(WebSocketServerProtocol) - reactor.listenTCP(self.port, self.factory) - reactor.callInThread(self.backend_reader) - reactor.callLater(1, self.keepalive_sender) - reactor.run() - - def backend_reader(self): - """ - Run in a separate thread; reads messages from the backend. - """ - while True: - channels = self.factory.reply_channels() - # Quit if reactor is stopping - if not reactor.running: - return - # Don't do anything if there's no channels to listen on - if channels: - channel, message = self.channel_backend.receive_many(channels) - else: - time.sleep(0.1) - continue - # Wait around if there's nothing received - if channel is None: - time.sleep(0.05) - continue - # Deal with the message - self.factory.dispatch_send(channel, message) - - def keepalive_sender(self): - """ - Sends keepalive messages for open WebSockets every - (channel_backend expiry / 2) seconds. - """ - expiry_window = int(self.channel_backend.expiry / 2) - for protocol in self.factory.reply_protocols.values(): - if time.time() - protocol.last_keepalive > expiry_window: - protocol.sendKeepalive() - reactor.callLater(1, self.keepalive_sender) diff --git a/channels/interfaces/wsgi.py b/channels/interfaces/wsgi.py deleted file mode 100644 index 004e6ea..0000000 --- a/channels/interfaces/wsgi.py +++ /dev/null @@ -1,22 +0,0 @@ -import django -from django.core.handlers.wsgi import WSGIHandler -from django.http import HttpResponse - -from channels import Channel - - -class WSGIInterface(WSGIHandler): - """ - WSGI application that pushes requests to channels. - """ - - def __init__(self, channel_backend, *args, **kwargs): - self.channel_backend = channel_backend - django.setup() - super(WSGIInterface, self).__init__(*args, **kwargs) - - def get_response(self, request): - request.reply_channel = Channel.new_name("http.response") - Channel("http.request", channel_backend=self.channel_backend).send(request.channel_encode()) - channel, message = self.channel_backend.receive_many_blocking([request.reply_channel]) - return HttpResponse.channel_decode(message) diff --git a/channels/management/commands/runallserver.py b/channels/management/commands/runallserver.py deleted file mode 100644 index 1762be5..0000000 --- a/channels/management/commands/runallserver.py +++ /dev/null @@ -1,26 +0,0 @@ -import time -from django.core.management import BaseCommand, CommandError -from channels import channel_backends, DEFAULT_CHANNEL_BACKEND - - -class Command(BaseCommand): - - def add_arguments(self, parser): - parser.add_argument('port', nargs='?', - help='Optional port number') - - def handle(self, *args, **options): - # Get the backend to use - channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - if channel_backend.local_only: - raise CommandError( - "You have a process-local channel backend configured, and so cannot run separate interface servers.\n" - "Configure a network-based backend in CHANNEL_BACKENDS to use this command." - ) - # Run the interface - port = int(options.get("port", None) or 8000) - from channels.interfaces.http_twisted import HttpTwistedInterface - self.stdout.write("Running twisted/Autobahn HTTP & WebSocket interface server") - self.stdout.write(" Channel backend: %s" % channel_backend) - self.stdout.write(" Listening on: 0.0.0.0:%i" % port) - HttpTwistedInterface(channel_backend=channel_backend, port=port).run() diff --git a/channels/management/commands/runserver.py b/channels/management/commands/runserver.py index 89c016a..63d3511 100644 --- a/channels/management/commands/runserver.py +++ b/channels/management/commands/runserver.py @@ -5,7 +5,6 @@ from django.core.management.commands.runserver import \ from channels import DEFAULT_CHANNEL_LAYER, channel_layers from channels.handler import ViewConsumer -from channels.interfaces.wsgi import WSGIInterface from channels.log import setup_logger from channels.worker import Worker @@ -17,32 +16,28 @@ class Command(RunserverCommand): self.logger = setup_logger('django.channels', self.verbosity) super(Command, self).handle(*args, **options) - def get_handler(self, *args, **options): - """ - Returns the default WSGI handler for the runner. - """ - return WSGIInterface(self.channel_layer) - def run(self, *args, **options): - # Run the rest - return super(Command, self).run(*args, **options) + # Don't autoreload for now + self.inner_run(None, **options) def inner_run(self, *args, **options): - # Check a handler is registered for http reqs + # Check a handler is registered for http reqs; if not, add default one self.channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER] if not self.channel_layer.registry.consumer_for_channel("http.request"): - # Register the default one self.channel_layer.registry.add_consumer(ViewConsumer(), ["http.request"]) - # Note that this is the right one on the console + # Note that this is the channel-enabled one on the console self.logger.info("Worker thread running, channels enabled") - if self.channel_layer.local_only: - self.logger.info("Local channel backend detected, no remote channels support") # Launch a worker thread worker = WorkerThread(self.channel_layer) worker.daemon = True worker.start() - # Run rest of inner run - super(Command, self).inner_run(*args, **options) + # Launch server in main thread + from daphne.server import Server + Server( + channel_layer=self.channel_layer, + host=self.addr, + port=int(self.port), + ).run() class WorkerThread(threading.Thread): diff --git a/channels/management/commands/runwsserver.py b/channels/management/commands/runwsserver.py deleted file mode 100644 index 54f2f0c..0000000 --- a/channels/management/commands/runwsserver.py +++ /dev/null @@ -1,38 +0,0 @@ -from django.core.management import BaseCommand, CommandError - -from channels import DEFAULT_CHANNEL_BACKEND, channel_backends -from channels.log import setup_logger - - -class Command(BaseCommand): - - def add_arguments(self, parser): - parser.add_argument('port', nargs='?', - help='Optional port number') - - def handle(self, *args, **options): - self.verbosity = options.get("verbosity", 1) - self.logger = setup_logger('django.channels', self.verbosity) - # Get the backend to use - channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - if channel_backend.local_only: - raise CommandError( - "You have a process-local channel backend configured, and so cannot run separate interface servers.\n" - "Configure a network-based backend in CHANNEL_BACKENDS to use this command." - ) - # Run the interface - port = int(options.get("port", None) or 9000) - try: - import asyncio # NOQA - except ImportError: - from channels.interfaces.websocket_twisted import WebsocketTwistedInterface - self.logger.info("Running Twisted/Autobahn WebSocket interface server") - self.logger.info(" Channel backend: %s", channel_backend) - self.logger.info(" Listening on: ws://0.0.0.0:%i" % port) - WebsocketTwistedInterface(channel_backend=channel_backend, port=port).run() - else: - from channels.interfaces.websocket_asyncio import WebsocketAsyncioInterface - self.logger.info("Running asyncio/Autobahn WebSocket interface server") - self.logger.info(" Channel backend: %s", channel_backend) - self.logger.info(" Listening on: ws://0.0.0.0:%i", port) - WebsocketAsyncioInterface(channel_backend=channel_backend, port=port).run() diff --git a/docs/concepts.rst b/docs/concepts.rst index 02c047d..7b32115 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -38,9 +38,10 @@ alternative is *at-least-once*, where normally one consumer gets the message but when things crash it's sent to more than one, which is not the trade-off we want. -There are a couple of other limitations - messages must be JSON serializable, -and not be more than 1MB in size - but these are to make the whole thing -practical, and not too important to think about up front. +There are a couple of other limitations - messages must be made of +serializable types, and stay under a certain size limit - but these are +implementation details you won't need to worry about until you get to more +advanced usage. The channels have capacity, so a load of producers can write lots of messages into a channel with no consumers and then a consumer can come along later and diff --git a/setup.py b/setup.py index 7cde525..29ee590 100644 --- a/setup.py +++ b/setup.py @@ -11,6 +11,6 @@ setup( packages=find_packages(), include_package_data=True, install_requires=[ - 'Django' + 'Django', ] )