mirror of
https://github.com/django/daphne.git
synced 2025-06-30 09:53:06 +03:00
Add redis backend
This commit is contained in:
parent
433625da1e
commit
75ee13ff9c
|
@ -9,7 +9,6 @@ from django.utils.timezone import now
|
||||||
|
|
||||||
from .base import BaseChannelBackend
|
from .base import BaseChannelBackend
|
||||||
|
|
||||||
queues = {}
|
|
||||||
|
|
||||||
class DatabaseChannelBackend(BaseChannelBackend):
|
class DatabaseChannelBackend(BaseChannelBackend):
|
||||||
"""
|
"""
|
||||||
|
|
56
channels/backends/redis_py.py
Normal file
56
channels/backends/redis_py.py
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
import redis
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from .base import BaseChannelBackend
|
||||||
|
|
||||||
|
|
||||||
|
class RedisChannelBackend(BaseChannelBackend):
|
||||||
|
"""
|
||||||
|
ORM-backed channel environment. For development use only; it will span
|
||||||
|
multiple processes fine, but it's going to be pretty bad at throughput.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, expiry=60, host="localhost", port=6379, prefix="django-channels:"):
|
||||||
|
super(RedisChannelBackend, self).__init__(expiry)
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.prefix = prefix
|
||||||
|
|
||||||
|
@property
|
||||||
|
def connection(self):
|
||||||
|
"""
|
||||||
|
Returns the correct connection for the current thread.
|
||||||
|
"""
|
||||||
|
return redis.Redis(host=self.host, port=self.port)
|
||||||
|
|
||||||
|
def send(self, channel, message):
|
||||||
|
key = uuid.uuid4()
|
||||||
|
self.connection.set(
|
||||||
|
key,
|
||||||
|
json.dumps(message),
|
||||||
|
ex = self.expiry + 10,
|
||||||
|
)
|
||||||
|
self.connection.rpush(
|
||||||
|
self.prefix + channel,
|
||||||
|
key,
|
||||||
|
)
|
||||||
|
|
||||||
|
def receive_many(self, channels):
|
||||||
|
if not channels:
|
||||||
|
raise ValueError("Cannot receive on empty channel list!")
|
||||||
|
# Get a message from one of our channels
|
||||||
|
while True:
|
||||||
|
result = self.connection.blpop([self.prefix + channel for channel in channels], timeout=1)
|
||||||
|
if result:
|
||||||
|
content = self.connection.get(result[1])
|
||||||
|
if content is None:
|
||||||
|
continue
|
||||||
|
return result[0][len(self.prefix):], json.loads(content)
|
||||||
|
else:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "%s(host=%s, port=%s)" % (self.__class__.__name__, self.host, self.port)
|
|
@ -40,7 +40,10 @@ class InterfaceProtocol(WebSocketServerProtocol):
|
||||||
)
|
)
|
||||||
|
|
||||||
def onChannelSend(self, content, binary=False, **kwargs):
|
def onChannelSend(self, content, binary=False, **kwargs):
|
||||||
|
if binary:
|
||||||
self.sendMessage(content, binary)
|
self.sendMessage(content, binary)
|
||||||
|
else:
|
||||||
|
self.sendMessage(content.encode("utf8"), binary)
|
||||||
|
|
||||||
def onClose(self, wasClean, code, reason):
|
def onClose(self, wasClean, code, reason):
|
||||||
del self.factory.protocols[self.send_channel]
|
del self.factory.protocols[self.send_channel]
|
||||||
|
@ -71,8 +74,8 @@ class InterfaceFactory(WebSocketServerFactory):
|
||||||
class WebsocketTwistedInterface(object):
|
class WebsocketTwistedInterface(object):
|
||||||
"""
|
"""
|
||||||
Easy API to run a WebSocket interface server using Twisted.
|
Easy API to run a WebSocket interface server using Twisted.
|
||||||
Integrates the channel backend by running it in a separate thread, as we don't
|
Integrates the channel backend by running it in a separate thread, using
|
||||||
know if the backend is Twisted-compliant.
|
the always-compatible polling style.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, channel_backend, port=9000):
|
def __init__(self, channel_backend, port=9000):
|
||||||
|
@ -80,7 +83,7 @@ class WebsocketTwistedInterface(object):
|
||||||
self.port = port
|
self.port = port
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.factory = InterfaceFactory("ws://localhost:%i" % self.port, debug=False)
|
self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False)
|
||||||
self.factory.protocol = InterfaceProtocol
|
self.factory.protocol = InterfaceProtocol
|
||||||
reactor.listenTCP(self.port, self.factory)
|
reactor.listenTCP(self.port, self.factory)
|
||||||
reactor.callInThread(self.backend_reader)
|
reactor.callInThread(self.backend_reader)
|
||||||
|
@ -92,6 +95,9 @@ class WebsocketTwistedInterface(object):
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
channels = self.factory.send_channels()
|
channels = self.factory.send_channels()
|
||||||
|
# Quit if reactor is stopping
|
||||||
|
if not reactor.running:
|
||||||
|
return
|
||||||
# Don't do anything if there's no channels to listen on
|
# Don't do anything if there's no channels to listen on
|
||||||
if channels:
|
if channels:
|
||||||
channel, message = self.channel_backend.receive_many(channels)
|
channel, message = self.channel_backend.receive_many(channels)
|
||||||
|
|
|
@ -7,6 +7,10 @@ from channels.utils import auto_import_consumers
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
parser.add_argument('port', nargs='?',
|
||||||
|
help='Optional port number')
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
# Get the backend to use
|
# Get the backend to use
|
||||||
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
|
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
|
||||||
|
@ -16,10 +20,9 @@ class Command(BaseCommand):
|
||||||
"You have a process-local channel backend configured, and so cannot run separate interface servers.\n"
|
"You have a process-local channel backend configured, and so cannot run separate interface servers.\n"
|
||||||
"Configure a network-based backend in CHANNEL_BACKENDS to use this command."
|
"Configure a network-based backend in CHANNEL_BACKENDS to use this command."
|
||||||
)
|
)
|
||||||
# Launch a worker
|
|
||||||
self.stdout.write("Running Twisted/Autobahn WebSocket interface against backend %s" % channel_backend)
|
|
||||||
# Run the interface
|
# Run the interface
|
||||||
try:
|
port = options.get("port", None) or 9000
|
||||||
WebsocketTwistedInterface(channel_backend=channel_backend).run()
|
self.stdout.write("Running Twisted/Autobahn WebSocket interface server")
|
||||||
except KeyboardInterrupt:
|
self.stdout.write(" Channel backend: %s" % channel_backend)
|
||||||
pass
|
self.stdout.write(" Listening on: ws://0.0.0.0:%i" % port)
|
||||||
|
WebsocketTwistedInterface(channel_backend=channel_backend, port=port).run()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user