mirror of
https://github.com/django/daphne.git
synced 2025-04-21 17:22:03 +03:00
Make runserver work using daphne
This commit is contained in:
parent
b9464ca149
commit
836f6be43a
|
@ -1,6 +1,10 @@
|
|||
Django Channels
|
||||
===============
|
||||
|
||||
**NOTE: The current master branch is in flux as it changes to match the final
|
||||
structure and the new ASGI spec. If you wish to use this in the meantime,
|
||||
please use a tagged release.**
|
||||
|
||||
This is a work-in-progress code branch of Django implemented as a third-party
|
||||
app, which aims to bring some asynchrony to Django and expand the options
|
||||
for code beyond the request-response model, in particular enabling WebSocket,
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
from django.utils.module_loading import import_string
|
||||
|
||||
|
||||
class InvalidChannelBackendError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class BackendManager(object):
|
||||
"""
|
||||
Takes a settings dictionary of backends and initialises them.
|
||||
"""
|
||||
|
||||
def __init__(self, backend_configs):
|
||||
self.configs = backend_configs
|
||||
self.backends = {}
|
||||
|
||||
def make_backend(self, name):
|
||||
# Load the backend class
|
||||
try:
|
||||
backend_class = import_string(self.configs[name]['BACKEND'])
|
||||
except KeyError:
|
||||
raise InvalidChannelBackendError("No BACKEND specified for %s" % name)
|
||||
except ImportError:
|
||||
raise InvalidChannelBackendError(
|
||||
"Cannot import BACKEND %r specified for %s" % (self.configs[name]['BACKEND'], name)
|
||||
)
|
||||
|
||||
# Initialise and pass config
|
||||
instance = backend_class(**{k.lower(): v for k, v in self.configs[name].items() if k != "BACKEND"})
|
||||
instance.alias = name
|
||||
return instance
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key not in self.backends:
|
||||
self.backends[key] = self.make_backend(key)
|
||||
return self.backends[key]
|
|
@ -1,109 +0,0 @@
|
|||
import time
|
||||
|
||||
from channels.consumer_registry import ConsumerRegistry
|
||||
|
||||
|
||||
class ChannelClosed(Exception):
|
||||
"""
|
||||
Raised when you try to send to a closed channel.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class BaseChannelBackend(object):
|
||||
"""
|
||||
Base class for all channel layer implementations. Manages both sending
|
||||
and receving messages from the backend, and each comes with its own
|
||||
registry of consumers.
|
||||
"""
|
||||
|
||||
# Flags if this backend can only be used inside one process.
|
||||
# Causes errors if you try to run workers/interfaces separately with it.
|
||||
local_only = False
|
||||
|
||||
def __init__(self, routing, expiry=60):
|
||||
self.registry = ConsumerRegistry(routing)
|
||||
self.expiry = expiry
|
||||
|
||||
def send(self, channel, message):
|
||||
"""
|
||||
Send a message over the channel, taken from the kwargs.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def receive_many(self, channels):
|
||||
"""
|
||||
Return the first message available on one of the
|
||||
channels passed, as a (channel, message) tuple, or return (None, None)
|
||||
if no channels are available.
|
||||
|
||||
Should not block, but is allowed to be moderately slow/have a short
|
||||
timeout - it needs to return so we can refresh the list of channels,
|
||||
not because the rest of the process is waiting on it.
|
||||
|
||||
Better performance can be achieved for interface servers by directly
|
||||
integrating the server and the backend code; this is merely for a
|
||||
generic support-everything pattern.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def receive_many_blocking(self, channels):
|
||||
"""
|
||||
Blocking version of receive_many, if the calling context knows it
|
||||
doesn't ever want to change the channels list until something happens.
|
||||
|
||||
This base class provides a default implementation; can be overridden
|
||||
to be more efficient by subclasses.
|
||||
"""
|
||||
while True:
|
||||
channel, message = self.receive_many(channels)
|
||||
if channel is None:
|
||||
time.sleep(0.05)
|
||||
continue
|
||||
return channel, message
|
||||
|
||||
def group_add(self, group, channel, expiry=None):
|
||||
"""
|
||||
Adds the channel to the named group for at least 'expiry'
|
||||
seconds (expiry defaults to message expiry if not provided).
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def group_discard(self, group, channel):
|
||||
"""
|
||||
Removes the channel from the named group if it is in the group;
|
||||
does nothing otherwise (does not error)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def group_channels(self, group):
|
||||
"""
|
||||
Returns an iterable of all channels in the group.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def send_group(self, group, message):
|
||||
"""
|
||||
Sends a message to the entire group.
|
||||
|
||||
This base class provides a default implementation; can be overridden
|
||||
to be more efficient by subclasses.
|
||||
"""
|
||||
for channel in self.group_channels(group):
|
||||
self.send(channel, message)
|
||||
|
||||
def __str__(self):
|
||||
return self.__class__.__name__
|
||||
|
||||
def lock_channel(self, channel):
|
||||
"""
|
||||
Attempts to get a lock on the named channel. Returns True if lock
|
||||
obtained, False if lock not obtained.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def unlock_channel(self, channel):
|
||||
"""
|
||||
Unlocks the named channel. Always succeeds.
|
||||
"""
|
||||
raise NotImplementedError()
|
|
@ -1,102 +0,0 @@
|
|||
import json
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
from .base import BaseChannelBackend
|
||||
|
||||
queues = {}
|
||||
groups = {}
|
||||
locks = set()
|
||||
|
||||
|
||||
class InMemoryChannelBackend(BaseChannelBackend):
|
||||
"""
|
||||
In-memory channel implementation. Intended only for use with threading,
|
||||
in low-throughput development environments.
|
||||
"""
|
||||
|
||||
local_only = True
|
||||
|
||||
def send(self, channel, message):
|
||||
# Try JSON encoding it to make sure it would, but store the native version
|
||||
json.dumps(message)
|
||||
# Add to the deque, making it if needs be
|
||||
queues.setdefault(channel, deque()).append((message, time.time() + self.expiry))
|
||||
|
||||
def receive_many(self, channels):
|
||||
if not channels:
|
||||
raise ValueError("Cannot receive on empty channel list!")
|
||||
# Try to pop a message from each channel
|
||||
self._clean_expired()
|
||||
for channel in channels:
|
||||
try:
|
||||
# This doesn't clean up empty channels - OK for testing.
|
||||
# For later versions, have cleanup w/lock.
|
||||
return channel, queues[channel].popleft()[0]
|
||||
except (IndexError, KeyError):
|
||||
pass
|
||||
return None, None
|
||||
|
||||
def _clean_expired(self):
|
||||
# Handle expired messages
|
||||
for channel, messages in queues.items():
|
||||
while len(messages) and messages[0][1] < time.time():
|
||||
messages.popleft()
|
||||
# Handle expired groups
|
||||
for group, channels in list(groups.items()):
|
||||
for channel, expiry in list(channels.items()):
|
||||
if expiry < (time.time() - 10):
|
||||
try:
|
||||
del groups[group][channel]
|
||||
except KeyError:
|
||||
# Another thread might have got there first
|
||||
pass
|
||||
|
||||
def group_add(self, group, channel, expiry=None):
|
||||
"""
|
||||
Adds the channel to the named group for at least 'expiry'
|
||||
seconds (expiry defaults to message expiry if not provided).
|
||||
"""
|
||||
groups.setdefault(group, {})[channel] = time.time() + (expiry or self.expiry)
|
||||
|
||||
def group_discard(self, group, channel):
|
||||
"""
|
||||
Removes the channel from the named group if it is in the group;
|
||||
does nothing otherwise (does not error)
|
||||
"""
|
||||
try:
|
||||
del groups[group][channel]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def group_channels(self, group):
|
||||
"""
|
||||
Returns an iterable of all channels in the group.
|
||||
"""
|
||||
self._clean_expired()
|
||||
return groups.get(group, {}).keys()
|
||||
|
||||
def lock_channel(self, channel):
|
||||
"""
|
||||
Attempts to get a lock on the named channel. Returns True if lock
|
||||
obtained, False if lock not obtained.
|
||||
"""
|
||||
# Probably not perfect for race conditions, but close enough considering
|
||||
# it shouldn't be used.
|
||||
if channel not in locks:
|
||||
locks.add(channel)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def unlock_channel(self, channel):
|
||||
"""
|
||||
Unlocks the named channel. Always succeeds.
|
||||
"""
|
||||
locks.discard(channel)
|
||||
|
||||
def flush(self):
|
||||
global queues, groups, locks
|
||||
queues = {}
|
||||
groups = {}
|
||||
locks = set()
|
|
@ -1,192 +0,0 @@
|
|||
import binascii
|
||||
import json
|
||||
import math
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import redis
|
||||
from django.utils import six
|
||||
|
||||
from .base import BaseChannelBackend
|
||||
|
||||
|
||||
class RedisChannelBackend(BaseChannelBackend):
|
||||
"""
|
||||
ORM-backed channel environment. For development use only; it will span
|
||||
multiple processes fine, but it's going to be pretty bad at throughput.
|
||||
"""
|
||||
|
||||
def __init__(self, routing, expiry=60, hosts=None, prefix="django-channels:"):
|
||||
super(RedisChannelBackend, self).__init__(routing=routing, expiry=expiry)
|
||||
# Make sure they provided some hosts, or provide a default
|
||||
if not hosts:
|
||||
hosts = [("localhost", 6379)]
|
||||
self.hosts = []
|
||||
for entry in hosts:
|
||||
if isinstance(entry, six.string_types):
|
||||
self.hosts.append(entry)
|
||||
else:
|
||||
self.hosts.append("redis://%s:%d/0" % (entry[0],entry[1]))
|
||||
self.prefix = prefix
|
||||
# Precalculate some values for ring selection
|
||||
self.ring_size = len(self.hosts)
|
||||
self.ring_divisor = int(math.ceil(4096 / float(self.ring_size)))
|
||||
|
||||
def consistent_hash(self, value):
|
||||
"""
|
||||
Maps the value to a node value between 0 and 4095
|
||||
using MD5, then down to one of the ring nodes.
|
||||
"""
|
||||
if isinstance(value, six.text_type):
|
||||
value = value.encode("utf8")
|
||||
bigval = binascii.crc32(value) & 0xffffffff
|
||||
return (bigval // 0x100000) // self.ring_divisor
|
||||
|
||||
def random_index(self):
|
||||
return random.randint(0, len(self.hosts) - 1)
|
||||
|
||||
def connection(self, index):
|
||||
"""
|
||||
Returns the correct connection for the current thread.
|
||||
|
||||
Pass key to use a server based on consistent hashing of the key value;
|
||||
pass None to use a random server instead.
|
||||
"""
|
||||
# If index is explicitly None, pick a random server
|
||||
if index is None:
|
||||
index = self.random_index()
|
||||
# Catch bad indexes
|
||||
if not (0 <= index < self.ring_size):
|
||||
raise ValueError("There are only %s hosts - you asked for %s!" % (self.ring_size, index))
|
||||
return redis.Redis.from_url(self.hosts[index])
|
||||
|
||||
def send(self, channel, message):
|
||||
# if channel is no str (=> bytes) convert it
|
||||
if not isinstance(channel, str):
|
||||
channel = channel.decode("utf-8")
|
||||
# Write out message into expiring key (avoids big items in list)
|
||||
# TODO: Use extended set, drop support for older redis?
|
||||
key = self.prefix + uuid.uuid4().hex
|
||||
|
||||
# Pick a connection to the right server - consistent for response
|
||||
# channels, random for normal channels
|
||||
if channel.startswith("!"):
|
||||
index = self.consistent_hash(key)
|
||||
connection = self.connection(index)
|
||||
else:
|
||||
connection = self.connection(None)
|
||||
|
||||
connection.set(
|
||||
key,
|
||||
json.dumps(message),
|
||||
)
|
||||
connection.expire(
|
||||
key,
|
||||
self.expiry + 10,
|
||||
)
|
||||
# Add key to list
|
||||
connection.rpush(
|
||||
self.prefix + channel,
|
||||
key,
|
||||
)
|
||||
# Set list to expire when message does (any later messages will bump this)
|
||||
connection.expire(
|
||||
self.prefix + channel,
|
||||
self.expiry + 10,
|
||||
)
|
||||
# TODO: Prune expired messages from same list (in case nobody consumes)
|
||||
|
||||
def receive_many(self, channels):
|
||||
if not channels:
|
||||
raise ValueError("Cannot receive on empty channel list!")
|
||||
# Work out what servers to listen on for the given channels
|
||||
indexes = {}
|
||||
random_index = self.random_index()
|
||||
for channel in channels:
|
||||
if channel.startswith("!"):
|
||||
indexes.setdefault(self.consistent_hash(channel), []).append(channel)
|
||||
else:
|
||||
indexes.setdefault(random_index, []).append(channel)
|
||||
# Get a message from one of our channels
|
||||
while True:
|
||||
# Select a random connection to use
|
||||
# TODO: Would we be better trying to do this truly async?
|
||||
index = random.choice(list(indexes.keys()))
|
||||
connection = self.connection(index)
|
||||
channels = indexes[index]
|
||||
# Shuffle channels to avoid the first ones starving others of workers
|
||||
random.shuffle(channels)
|
||||
# Pop off any waiting message
|
||||
result = connection.blpop([self.prefix + channel for channel in channels], timeout=1)
|
||||
if result:
|
||||
content = connection.get(result[1])
|
||||
if content is None:
|
||||
continue
|
||||
return result[0][len(self.prefix):].decode("utf-8"), json.loads(content.decode("utf-8"))
|
||||
else:
|
||||
return None, None
|
||||
|
||||
def group_add(self, group, channel, expiry=None):
|
||||
"""
|
||||
Adds the channel to the named group for at least 'expiry'
|
||||
seconds (expiry defaults to message expiry if not provided).
|
||||
"""
|
||||
key = "%s:group:%s" % (self.prefix, group)
|
||||
key = key.encode("utf8")
|
||||
self.connection(self.consistent_hash(group)).zadd(
|
||||
key,
|
||||
**{channel: time.time() + (expiry or self.expiry)}
|
||||
)
|
||||
|
||||
def group_discard(self, group, channel):
|
||||
"""
|
||||
Removes the channel from the named group if it is in the group;
|
||||
does nothing otherwise (does not error)
|
||||
"""
|
||||
key = "%s:group:%s" % (self.prefix, group)
|
||||
key = key.encode("utf8")
|
||||
self.connection(self.consistent_hash(group)).zrem(
|
||||
key,
|
||||
channel,
|
||||
)
|
||||
|
||||
def group_channels(self, group):
|
||||
"""
|
||||
Returns an iterable of all channels in the group.
|
||||
"""
|
||||
key = "%s:group:%s" % (self.prefix, group)
|
||||
key = key.encode("utf8")
|
||||
connection = self.connection(self.consistent_hash(group))
|
||||
# Discard old channels
|
||||
connection.zremrangebyscore(key, 0, int(time.time()) - 10)
|
||||
# Return current lot
|
||||
return [x.decode("utf8") for x in connection.zrange(
|
||||
key,
|
||||
0,
|
||||
-1,
|
||||
)]
|
||||
|
||||
# TODO: send_group efficient implementation using Lua
|
||||
|
||||
def lock_channel(self, channel, expiry=None):
|
||||
"""
|
||||
Attempts to get a lock on the named channel. Returns True if lock
|
||||
obtained, False if lock not obtained.
|
||||
"""
|
||||
key = "%s:lock:%s" % (self.prefix, channel)
|
||||
return bool(self.connection(self.consistent_hash(channel)).setnx(key, "1"))
|
||||
|
||||
def unlock_channel(self, channel):
|
||||
"""
|
||||
Unlocks the named channel. Always succeeds.
|
||||
"""
|
||||
key = "%s:lock:%s" % (self.prefix, channel)
|
||||
self.connection(self.consistent_hash(channel)).delete(key)
|
||||
|
||||
def __str__(self):
|
||||
return "%s(hosts=%s)" % (self.__class__.__name__, self.hosts)
|
||||
|
||||
def flush(self):
|
||||
for i in range(self.ring_size):
|
||||
self.connection(i).flushdb()
|
|
@ -1,249 +0,0 @@
|
|||
import time
|
||||
|
||||
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
|
||||
from twisted.python.compat import _PY3
|
||||
from twisted.web.http import HTTPFactory, HTTPChannel, Request, _respondToBadRequestAndDisconnect, parse_qs, _parseHeader
|
||||
from twisted.protocols.policies import ProtocolWrapper
|
||||
from twisted.internet import reactor
|
||||
|
||||
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
|
||||
from .websocket_autobahn import get_protocol, get_factory
|
||||
|
||||
|
||||
WebsocketProtocol = get_protocol(WebSocketServerProtocol)
|
||||
|
||||
|
||||
class WebRequest(Request):
|
||||
"""
|
||||
Request that either hands off information to channels, or offloads
|
||||
to a WebSocket class.
|
||||
|
||||
Does some extra processing over the normal Twisted Web request to separate
|
||||
GET and POST out.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
Request.__init__(self, *args, **kwargs)
|
||||
self.reply_channel = Channel.new_name("!http.response")
|
||||
self.channel.factory.reply_protocols[self.reply_channel] = self
|
||||
|
||||
def process(self):
|
||||
# Get upgrade header
|
||||
upgrade_header = None
|
||||
if self.requestHeaders.hasHeader("Upgrade"):
|
||||
upgrade_header = self.requestHeaders.getRawHeaders("Upgrade")[0]
|
||||
# Is it WebSocket? IS IT?!
|
||||
if upgrade_header == "websocket":
|
||||
# Make WebSocket protocol to hand off to
|
||||
protocol = self.channel.factory.ws_factory.buildProtocol(self.transport.getPeer())
|
||||
if not protocol:
|
||||
# If protocol creation fails, we signal "internal server error"
|
||||
self.setResponseCode(500)
|
||||
self.finish()
|
||||
# Port across transport
|
||||
transport, self.transport = self.transport, None
|
||||
if isinstance(transport, ProtocolWrapper):
|
||||
# i.e. TLS is a wrapping protocol
|
||||
transport.wrappedProtocol = protocol
|
||||
else:
|
||||
transport.protocol = protocol
|
||||
protocol.makeConnection(transport)
|
||||
# Re-inject request
|
||||
if _PY3:
|
||||
data = self.method + b' ' + self.uri + b' HTTP/1.1\x0d\x0a'
|
||||
for h in self.requestHeaders.getAllRawHeaders():
|
||||
data += h[0] + b': ' + b",".join(h[1]) + b'\x0d\x0a'
|
||||
data += b"\x0d\x0a"
|
||||
data += self.content.read()
|
||||
else:
|
||||
data = "%s %s HTTP/1.1\x0d\x0a" % (self.method, self.uri)
|
||||
for h in self.requestHeaders.getAllRawHeaders():
|
||||
data += "%s: %s\x0d\x0a" % (h[0], ",".join(h[1]))
|
||||
data += "\x0d\x0a"
|
||||
protocol.dataReceived(data)
|
||||
# Remove our HTTP reply channel association
|
||||
self.channel.factory.reply_protocols[self.reply_channel] = None
|
||||
self.reply_channel = None
|
||||
# Boring old HTTP.
|
||||
else:
|
||||
# Send request message
|
||||
Channel("http.request").send({
|
||||
"reply_channel": self.reply_channel,
|
||||
"method": self.method,
|
||||
"get": self.get,
|
||||
"post": self.post,
|
||||
"cookies": self.received_cookies,
|
||||
"headers": {k: v[0] for k, v in self.requestHeaders.getAllRawHeaders()},
|
||||
"client": [self.client.host, self.client.port],
|
||||
"server": [self.host.host, self.host.port],
|
||||
"path": self.path,
|
||||
})
|
||||
|
||||
def connectionLost(self, reason):
|
||||
"""
|
||||
Cleans up reply channel on close.
|
||||
"""
|
||||
if self.reply_channel:
|
||||
del self.channel.factory.reply_protocols[self.reply_channel]
|
||||
Request.connectionLost(self, reason)
|
||||
|
||||
def serverResponse(self, message):
|
||||
"""
|
||||
Writes a received HTTP response back out to the transport.
|
||||
"""
|
||||
# Write code
|
||||
self.setResponseCode(message['status'])
|
||||
# Write headers
|
||||
for header, value in message.get("headers", {}):
|
||||
self.setHeader(header.encode("utf8"), value.encode("utf8"))
|
||||
# Write cookies
|
||||
for cookie in message.get("cookies"):
|
||||
self.cookies.append(cookie.encode("utf8"))
|
||||
# Write out body
|
||||
if "content" in message:
|
||||
Request.write(self, message['content'].encode("utf8"))
|
||||
self.finish()
|
||||
|
||||
def requestReceived(self, command, path, version):
|
||||
"""
|
||||
Called by channel when all data has been received.
|
||||
Overridden because Twisted merges GET and POST into one thing by default.
|
||||
"""
|
||||
self.content.seek(0,0)
|
||||
self.get = {}
|
||||
self.post = {}
|
||||
|
||||
self.method, self.uri = command, path
|
||||
self.clientproto = version
|
||||
x = self.uri.split(b'?', 1)
|
||||
|
||||
print self.method
|
||||
|
||||
# URI and GET args assignment
|
||||
if len(x) == 1:
|
||||
self.path = self.uri
|
||||
else:
|
||||
self.path, argstring = x
|
||||
self.get = parse_qs(argstring, 1)
|
||||
|
||||
# cache the client and server information, we'll need this later to be
|
||||
# serialized and sent with the request so CGIs will work remotely
|
||||
self.client = self.channel.transport.getPeer()
|
||||
self.host = self.channel.transport.getHost()
|
||||
|
||||
# Argument processing
|
||||
ctype = self.requestHeaders.getRawHeaders(b'content-type')
|
||||
if ctype is not None:
|
||||
ctype = ctype[0]
|
||||
|
||||
# Process POST data if present
|
||||
if self.method == b"POST" and ctype:
|
||||
mfd = b'multipart/form-data'
|
||||
key, pdict = _parseHeader(ctype)
|
||||
if key == b'application/x-www-form-urlencoded':
|
||||
self.post.update(parse_qs(self.content.read(), 1))
|
||||
elif key == mfd:
|
||||
try:
|
||||
cgiArgs = cgi.parse_multipart(self.content, pdict)
|
||||
|
||||
if _PY3:
|
||||
# parse_multipart on Python 3 decodes the header bytes
|
||||
# as iso-8859-1 and returns a str key -- we want bytes
|
||||
# so encode it back
|
||||
self.post.update({x.encode('iso-8859-1'): y
|
||||
for x, y in cgiArgs.items()})
|
||||
else:
|
||||
self.post.update(cgiArgs)
|
||||
except:
|
||||
# It was a bad request.
|
||||
_respondToBadRequestAndDisconnect(self.channel.transport)
|
||||
return
|
||||
self.content.seek(0, 0)
|
||||
|
||||
# Continue with rest of request handling
|
||||
self.process()
|
||||
|
||||
|
||||
class WebProtocol(HTTPChannel):
|
||||
|
||||
requestFactory = WebRequest
|
||||
|
||||
|
||||
class WebFactory(HTTPFactory):
|
||||
|
||||
protocol = WebProtocol
|
||||
|
||||
def __init__(self):
|
||||
HTTPFactory.__init__(self)
|
||||
# We track all sub-protocols for response channel mapping
|
||||
self.reply_protocols = {}
|
||||
# Make a factory for WebSocket protocols
|
||||
self.ws_factory = WebSocketServerFactory("ws://127.0.0.1:8000")
|
||||
self.ws_factory.protocol = WebsocketProtocol
|
||||
self.ws_factory.reply_protocols = self.reply_protocols
|
||||
|
||||
def reply_channels(self):
|
||||
return self.reply_protocols.keys()
|
||||
|
||||
def dispatch_reply(self, channel, message):
|
||||
if channel.startswith("!http") and isinstance(self.reply_protocols[channel], WebRequest):
|
||||
self.reply_protocols[channel].serverResponse(message)
|
||||
elif channel.startswith("!websocket") and isinstance(self.reply_protocols[channel], WebsocketProtocol):
|
||||
if message.get("content", None):
|
||||
self.reply_protocols[channel].serverSend(**message)
|
||||
if message.get("close", False):
|
||||
self.reply_protocols[channel].serverClose()
|
||||
else:
|
||||
raise ValueError("Cannot dispatch message on channel %r" % channel)
|
||||
|
||||
|
||||
class HttpTwistedInterface(object):
|
||||
"""
|
||||
Easy API to run a HTTP1 & WebSocket interface server using Twisted.
|
||||
Integrates the channel backend by running it in a separate thread, using
|
||||
the always-compatible polling style.
|
||||
"""
|
||||
|
||||
def __init__(self, channel_backend, port=8000):
|
||||
self.channel_backend = channel_backend
|
||||
self.port = port
|
||||
|
||||
def run(self):
|
||||
self.factory = WebFactory()
|
||||
reactor.listenTCP(self.port, self.factory)
|
||||
reactor.callInThread(self.backend_reader)
|
||||
#reactor.callLater(1, self.keepalive_sender)
|
||||
reactor.run()
|
||||
|
||||
def backend_reader(self):
|
||||
"""
|
||||
Run in a separate thread; reads messages from the backend.
|
||||
"""
|
||||
while True:
|
||||
channels = self.factory.reply_channels()
|
||||
# Quit if reactor is stopping
|
||||
if not reactor.running:
|
||||
return
|
||||
# Don't do anything if there's no channels to listen on
|
||||
if channels:
|
||||
channel, message = self.channel_backend.receive_many(channels)
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
# Wait around if there's nothing received
|
||||
if channel is None:
|
||||
time.sleep(0.05)
|
||||
continue
|
||||
# Deal with the message
|
||||
self.factory.dispatch_reply(channel, message)
|
||||
|
||||
def keepalive_sender(self):
|
||||
"""
|
||||
Sends keepalive messages for open WebSockets every
|
||||
(channel_backend expiry / 2) seconds.
|
||||
"""
|
||||
expiry_window = int(self.channel_backend.expiry / 2)
|
||||
for protocol in self.factory.reply_protocols.values():
|
||||
if time.time() - protocol.last_keepalive > expiry_window:
|
||||
protocol.sendKeepalive()
|
||||
reactor.callLater(1, self.keepalive_sender)
|
|
@ -1,72 +0,0 @@
|
|||
import time
|
||||
|
||||
import asyncio
|
||||
from autobahn.asyncio.websocket import (
|
||||
WebSocketServerFactory, WebSocketServerProtocol,
|
||||
)
|
||||
|
||||
from .websocket_autobahn import get_factory, get_protocol
|
||||
|
||||
|
||||
class WebsocketAsyncioInterface(object):
|
||||
"""
|
||||
Easy API to run a WebSocket interface server using Twisted.
|
||||
Integrates the channel backend by running it in a separate thread, using
|
||||
the always-compatible polling style.
|
||||
"""
|
||||
|
||||
def __init__(self, channel_backend, port=9000):
|
||||
self.channel_backend = channel_backend
|
||||
self.port = port
|
||||
|
||||
def run(self):
|
||||
self.factory = get_factory(WebSocketServerFactory)(debug=False)
|
||||
self.factory.protocol = get_protocol(WebSocketServerProtocol)
|
||||
self.loop = asyncio.get_event_loop()
|
||||
coro = self.loop.create_server(self.factory, '0.0.0.0', self.port)
|
||||
server = self.loop.run_until_complete(coro)
|
||||
self.loop.run_in_executor(None, self.backend_reader)
|
||||
self.loop.call_later(1, self.keepalive_sender)
|
||||
try:
|
||||
self.loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
server.close()
|
||||
self.loop.close()
|
||||
|
||||
def backend_reader(self):
|
||||
"""
|
||||
Run in a separate thread; reads messages from the backend.
|
||||
"""
|
||||
# Wait for main loop to start
|
||||
time.sleep(0.5)
|
||||
while True:
|
||||
channels = self.factory.reply_channels()
|
||||
# Quit if reactor is stopping
|
||||
if not self.loop.is_running():
|
||||
return
|
||||
# Don't do anything if there's no channels to listen on
|
||||
if channels:
|
||||
channel, message = self.channel_backend.receive_many(channels)
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
# Wait around if there's nothing received
|
||||
if channel is None:
|
||||
time.sleep(0.05)
|
||||
continue
|
||||
# Deal with the message
|
||||
self.factory.dispatch_send(channel, message)
|
||||
|
||||
def keepalive_sender(self):
|
||||
"""
|
||||
Sends keepalive messages for open WebSockets every
|
||||
(channel_backend expiry / 2) seconds.
|
||||
"""
|
||||
expiry_window = int(self.channel_backend.expiry / 2)
|
||||
for protocol in self.factory.reply_protocols.values():
|
||||
if time.time() - protocol.last_keepalive > expiry_window:
|
||||
protocol.sendKeepalive()
|
||||
if self.loop.is_running():
|
||||
self.loop.call_later(1, self.keepalive_sender)
|
|
@ -1,104 +0,0 @@
|
|||
import time
|
||||
|
||||
from django.http import parse_cookie
|
||||
|
||||
from channels import DEFAULT_CHANNEL_BACKEND, Channel, channel_backends
|
||||
|
||||
|
||||
def get_protocol(base):
|
||||
|
||||
class InterfaceProtocol(base):
|
||||
"""
|
||||
Protocol which supports WebSockets and forwards incoming messages to
|
||||
the websocket channels.
|
||||
"""
|
||||
|
||||
def onConnect(self, request):
|
||||
self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
|
||||
self.request_info = {
|
||||
"path": request.path,
|
||||
"get": request.params,
|
||||
"cookies": parse_cookie(request.headers.get('cookie', ''))
|
||||
}
|
||||
|
||||
def onOpen(self):
|
||||
# Make sending channel
|
||||
self.reply_channel = Channel.new_name("!websocket.send")
|
||||
self.request_info["reply_channel"] = self.reply_channel
|
||||
self.last_keepalive = time.time()
|
||||
self.factory.reply_protocols[self.reply_channel] = self
|
||||
# Send news that this channel is open
|
||||
Channel("websocket.connect").send(self.request_info)
|
||||
|
||||
def onMessage(self, payload, isBinary):
|
||||
if isBinary:
|
||||
Channel("websocket.receive").send({
|
||||
"reply_channel": self.reply_channel,
|
||||
"content": payload,
|
||||
"binary": True,
|
||||
})
|
||||
else:
|
||||
Channel("websocket.receive").send({
|
||||
"reply_channel": self.reply_channel,
|
||||
"content": payload.decode("utf8"),
|
||||
"binary": False,
|
||||
})
|
||||
|
||||
def serverSend(self, content, binary=False, **kwargs):
|
||||
"""
|
||||
Server-side channel message to send a message.
|
||||
"""
|
||||
if binary:
|
||||
self.sendMessage(content, binary)
|
||||
else:
|
||||
self.sendMessage(content.encode("utf8"), binary)
|
||||
|
||||
def serverClose(self):
|
||||
"""
|
||||
Server-side channel message to close the socket
|
||||
"""
|
||||
self.sendClose()
|
||||
|
||||
def onClose(self, wasClean, code, reason):
|
||||
if hasattr(self, "reply_channel"):
|
||||
del self.factory.reply_protocols[self.reply_channel]
|
||||
Channel("websocket.disconnect").send({
|
||||
"reply_channel": self.reply_channel,
|
||||
})
|
||||
|
||||
def sendKeepalive(self):
|
||||
"""
|
||||
Sends a keepalive packet on the keepalive channel.
|
||||
"""
|
||||
Channel("websocket.keepalive").send({
|
||||
"reply_channel": self.reply_channel,
|
||||
})
|
||||
self.last_keepalive = time.time()
|
||||
|
||||
return InterfaceProtocol
|
||||
|
||||
|
||||
def get_factory(base):
|
||||
|
||||
class InterfaceFactory(base):
|
||||
"""
|
||||
Factory which keeps track of its open protocols' receive channels
|
||||
and can dispatch to them.
|
||||
"""
|
||||
|
||||
# TODO: Clean up dead protocols if needed?
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(InterfaceFactory, self).__init__(*args, **kwargs)
|
||||
self.reply_protocols = {}
|
||||
|
||||
def reply_channels(self):
|
||||
return self.reply_protocols.keys()
|
||||
|
||||
def dispatch_send(self, channel, message):
|
||||
if message.get("content", None):
|
||||
self.reply_protocols[channel].serverSend(**message)
|
||||
if message.get("close", False):
|
||||
self.reply_protocols[channel].serverClose()
|
||||
|
||||
return InterfaceFactory
|
|
@ -1,61 +0,0 @@
|
|||
import time
|
||||
|
||||
from autobahn.twisted.websocket import (
|
||||
WebSocketServerFactory, WebSocketServerProtocol,
|
||||
)
|
||||
from twisted.internet import reactor
|
||||
|
||||
from .websocket_autobahn import get_factory, get_protocol
|
||||
|
||||
|
||||
class WebsocketTwistedInterface(object):
|
||||
"""
|
||||
Easy API to run a WebSocket interface server using Twisted.
|
||||
Integrates the channel backend by running it in a separate thread, using
|
||||
the always-compatible polling style.
|
||||
"""
|
||||
|
||||
def __init__(self, channel_backend, port=9000):
|
||||
self.channel_backend = channel_backend
|
||||
self.port = port
|
||||
|
||||
def run(self):
|
||||
self.factory = get_factory(WebSocketServerFactory)(debug=False)
|
||||
self.factory.protocol = get_protocol(WebSocketServerProtocol)
|
||||
reactor.listenTCP(self.port, self.factory)
|
||||
reactor.callInThread(self.backend_reader)
|
||||
reactor.callLater(1, self.keepalive_sender)
|
||||
reactor.run()
|
||||
|
||||
def backend_reader(self):
|
||||
"""
|
||||
Run in a separate thread; reads messages from the backend.
|
||||
"""
|
||||
while True:
|
||||
channels = self.factory.reply_channels()
|
||||
# Quit if reactor is stopping
|
||||
if not reactor.running:
|
||||
return
|
||||
# Don't do anything if there's no channels to listen on
|
||||
if channels:
|
||||
channel, message = self.channel_backend.receive_many(channels)
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
# Wait around if there's nothing received
|
||||
if channel is None:
|
||||
time.sleep(0.05)
|
||||
continue
|
||||
# Deal with the message
|
||||
self.factory.dispatch_send(channel, message)
|
||||
|
||||
def keepalive_sender(self):
|
||||
"""
|
||||
Sends keepalive messages for open WebSockets every
|
||||
(channel_backend expiry / 2) seconds.
|
||||
"""
|
||||
expiry_window = int(self.channel_backend.expiry / 2)
|
||||
for protocol in self.factory.reply_protocols.values():
|
||||
if time.time() - protocol.last_keepalive > expiry_window:
|
||||
protocol.sendKeepalive()
|
||||
reactor.callLater(1, self.keepalive_sender)
|
|
@ -1,22 +0,0 @@
|
|||
import django
|
||||
from django.core.handlers.wsgi import WSGIHandler
|
||||
from django.http import HttpResponse
|
||||
|
||||
from channels import Channel
|
||||
|
||||
|
||||
class WSGIInterface(WSGIHandler):
|
||||
"""
|
||||
WSGI application that pushes requests to channels.
|
||||
"""
|
||||
|
||||
def __init__(self, channel_backend, *args, **kwargs):
|
||||
self.channel_backend = channel_backend
|
||||
django.setup()
|
||||
super(WSGIInterface, self).__init__(*args, **kwargs)
|
||||
|
||||
def get_response(self, request):
|
||||
request.reply_channel = Channel.new_name("http.response")
|
||||
Channel("http.request", channel_backend=self.channel_backend).send(request.channel_encode())
|
||||
channel, message = self.channel_backend.receive_many_blocking([request.reply_channel])
|
||||
return HttpResponse.channel_decode(message)
|
|
@ -1,26 +0,0 @@
|
|||
import time
|
||||
from django.core.management import BaseCommand, CommandError
|
||||
from channels import channel_backends, DEFAULT_CHANNEL_BACKEND
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('port', nargs='?',
|
||||
help='Optional port number')
|
||||
|
||||
def handle(self, *args, **options):
|
||||
# Get the backend to use
|
||||
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
|
||||
if channel_backend.local_only:
|
||||
raise CommandError(
|
||||
"You have a process-local channel backend configured, and so cannot run separate interface servers.\n"
|
||||
"Configure a network-based backend in CHANNEL_BACKENDS to use this command."
|
||||
)
|
||||
# Run the interface
|
||||
port = int(options.get("port", None) or 8000)
|
||||
from channels.interfaces.http_twisted import HttpTwistedInterface
|
||||
self.stdout.write("Running twisted/Autobahn HTTP & WebSocket interface server")
|
||||
self.stdout.write(" Channel backend: %s" % channel_backend)
|
||||
self.stdout.write(" Listening on: 0.0.0.0:%i" % port)
|
||||
HttpTwistedInterface(channel_backend=channel_backend, port=port).run()
|
|
@ -5,7 +5,6 @@ from django.core.management.commands.runserver import \
|
|||
|
||||
from channels import DEFAULT_CHANNEL_LAYER, channel_layers
|
||||
from channels.handler import ViewConsumer
|
||||
from channels.interfaces.wsgi import WSGIInterface
|
||||
from channels.log import setup_logger
|
||||
from channels.worker import Worker
|
||||
|
||||
|
@ -17,32 +16,28 @@ class Command(RunserverCommand):
|
|||
self.logger = setup_logger('django.channels', self.verbosity)
|
||||
super(Command, self).handle(*args, **options)
|
||||
|
||||
def get_handler(self, *args, **options):
|
||||
"""
|
||||
Returns the default WSGI handler for the runner.
|
||||
"""
|
||||
return WSGIInterface(self.channel_layer)
|
||||
|
||||
def run(self, *args, **options):
|
||||
# Run the rest
|
||||
return super(Command, self).run(*args, **options)
|
||||
# Don't autoreload for now
|
||||
self.inner_run(None, **options)
|
||||
|
||||
def inner_run(self, *args, **options):
|
||||
# Check a handler is registered for http reqs
|
||||
# Check a handler is registered for http reqs; if not, add default one
|
||||
self.channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
if not self.channel_layer.registry.consumer_for_channel("http.request"):
|
||||
# Register the default one
|
||||
self.channel_layer.registry.add_consumer(ViewConsumer(), ["http.request"])
|
||||
# Note that this is the right one on the console
|
||||
# Note that this is the channel-enabled one on the console
|
||||
self.logger.info("Worker thread running, channels enabled")
|
||||
if self.channel_layer.local_only:
|
||||
self.logger.info("Local channel backend detected, no remote channels support")
|
||||
# Launch a worker thread
|
||||
worker = WorkerThread(self.channel_layer)
|
||||
worker.daemon = True
|
||||
worker.start()
|
||||
# Run rest of inner run
|
||||
super(Command, self).inner_run(*args, **options)
|
||||
# Launch server in main thread
|
||||
from daphne.server import Server
|
||||
Server(
|
||||
channel_layer=self.channel_layer,
|
||||
host=self.addr,
|
||||
port=int(self.port),
|
||||
).run()
|
||||
|
||||
|
||||
class WorkerThread(threading.Thread):
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
from django.core.management import BaseCommand, CommandError
|
||||
|
||||
from channels import DEFAULT_CHANNEL_BACKEND, channel_backends
|
||||
from channels.log import setup_logger
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('port', nargs='?',
|
||||
help='Optional port number')
|
||||
|
||||
def handle(self, *args, **options):
|
||||
self.verbosity = options.get("verbosity", 1)
|
||||
self.logger = setup_logger('django.channels', self.verbosity)
|
||||
# Get the backend to use
|
||||
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
|
||||
if channel_backend.local_only:
|
||||
raise CommandError(
|
||||
"You have a process-local channel backend configured, and so cannot run separate interface servers.\n"
|
||||
"Configure a network-based backend in CHANNEL_BACKENDS to use this command."
|
||||
)
|
||||
# Run the interface
|
||||
port = int(options.get("port", None) or 9000)
|
||||
try:
|
||||
import asyncio # NOQA
|
||||
except ImportError:
|
||||
from channels.interfaces.websocket_twisted import WebsocketTwistedInterface
|
||||
self.logger.info("Running Twisted/Autobahn WebSocket interface server")
|
||||
self.logger.info(" Channel backend: %s", channel_backend)
|
||||
self.logger.info(" Listening on: ws://0.0.0.0:%i" % port)
|
||||
WebsocketTwistedInterface(channel_backend=channel_backend, port=port).run()
|
||||
else:
|
||||
from channels.interfaces.websocket_asyncio import WebsocketAsyncioInterface
|
||||
self.logger.info("Running asyncio/Autobahn WebSocket interface server")
|
||||
self.logger.info(" Channel backend: %s", channel_backend)
|
||||
self.logger.info(" Listening on: ws://0.0.0.0:%i", port)
|
||||
WebsocketAsyncioInterface(channel_backend=channel_backend, port=port).run()
|
|
@ -38,9 +38,10 @@ alternative is *at-least-once*, where normally one consumer gets the message
|
|||
but when things crash it's sent to more than one, which is not the trade-off
|
||||
we want.
|
||||
|
||||
There are a couple of other limitations - messages must be JSON serializable,
|
||||
and not be more than 1MB in size - but these are to make the whole thing
|
||||
practical, and not too important to think about up front.
|
||||
There are a couple of other limitations - messages must be made of
|
||||
serializable types, and stay under a certain size limit - but these are
|
||||
implementation details you won't need to worry about until you get to more
|
||||
advanced usage.
|
||||
|
||||
The channels have capacity, so a load of producers can write lots of messages
|
||||
into a channel with no consumers and then a consumer can come along later and
|
||||
|
|
Loading…
Reference in New Issue
Block a user