mirror of
https://github.com/django/daphne.git
synced 2025-07-14 18:02:17 +03:00
Change to consumers taking a single "message" argument
This commit is contained in:
parent
9b92eec43a
commit
48d6f63fb2
|
@ -3,7 +3,7 @@ import functools
|
||||||
from django.core.handlers.base import BaseHandler
|
from django.core.handlers.base import BaseHandler
|
||||||
from django.http import HttpRequest, HttpResponse
|
from django.http import HttpRequest, HttpResponse
|
||||||
|
|
||||||
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
|
from channels import Channel
|
||||||
|
|
||||||
|
|
||||||
class UrlConsumer(object):
|
class UrlConsumer(object):
|
||||||
|
@ -15,13 +15,13 @@ class UrlConsumer(object):
|
||||||
self.handler = BaseHandler()
|
self.handler = BaseHandler()
|
||||||
self.handler.load_middleware()
|
self.handler.load_middleware()
|
||||||
|
|
||||||
def __call__(self, channel, **kwargs):
|
def __call__(self, message):
|
||||||
request = HttpRequest.channel_decode(kwargs)
|
request = HttpRequest.channel_decode(message.content)
|
||||||
try:
|
try:
|
||||||
response = self.handler.get_response(request)
|
response = self.handler.get_response(request)
|
||||||
except HttpResponse.ResponseLater:
|
except HttpResponse.ResponseLater:
|
||||||
return
|
return
|
||||||
Channel(request.response_channel).send(**response.channel_encode())
|
message.reply_channel.send(response.channel_encode())
|
||||||
|
|
||||||
|
|
||||||
def view_producer(channel_name):
|
def view_producer(channel_name):
|
||||||
|
@ -30,24 +30,19 @@ def view_producer(channel_name):
|
||||||
and abandons the response (with an exception the Worker will catch)
|
and abandons the response (with an exception the Worker will catch)
|
||||||
"""
|
"""
|
||||||
def producing_view(request):
|
def producing_view(request):
|
||||||
Channel(channel_name).send(**request.channel_encode())
|
Channel(channel_name).send(request.channel_encode())
|
||||||
raise HttpResponse.ResponseLater()
|
raise HttpResponse.ResponseLater()
|
||||||
return producing_view
|
return producing_view
|
||||||
|
|
||||||
|
|
||||||
def view_consumer(channel_name, alias=DEFAULT_CHANNEL_BACKEND):
|
def view_consumer(func):
|
||||||
"""
|
"""
|
||||||
Decorates a normal Django view to be a channel consumer.
|
Decorates a normal Django view to be a channel consumer.
|
||||||
Does not run any middleware
|
Does not run any middleware
|
||||||
"""
|
"""
|
||||||
def inner(func):
|
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def consumer(channel, **kwargs):
|
def consumer(message):
|
||||||
request = HttpRequest.channel_decode(kwargs)
|
request = HttpRequest.channel_decode(message.content)
|
||||||
response = func(request)
|
response = func(request)
|
||||||
Channel(request.response_channel).send(**response.channel_encode())
|
message.reply_channel.send(response.channel_encode())
|
||||||
# Get the channel layer and register
|
|
||||||
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
|
|
||||||
channel_backend.registry.add_consumer(consumer, [channel_name])
|
|
||||||
return func
|
return func
|
||||||
return inner
|
|
||||||
|
|
|
@ -26,11 +26,13 @@ class Channel(object):
|
||||||
else:
|
else:
|
||||||
self.channel_backend = channel_backends[alias]
|
self.channel_backend = channel_backends[alias]
|
||||||
|
|
||||||
def send(self, **kwargs):
|
def send(self, content):
|
||||||
"""
|
"""
|
||||||
Send a message over the channel, taken from the kwargs.
|
Send a message over the channel - messages are always dicts.
|
||||||
"""
|
"""
|
||||||
self.channel_backend.send(self.name, kwargs)
|
if not isinstance(content, dict):
|
||||||
|
raise ValueError("You can only send dicts as content on channels.")
|
||||||
|
self.channel_backend.send(self.name, content)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def new_name(self, prefix):
|
def new_name(self, prefix):
|
||||||
|
@ -51,6 +53,9 @@ class Channel(object):
|
||||||
from channels.adapters import view_producer
|
from channels.adapters import view_producer
|
||||||
return view_producer(self.name)
|
return view_producer(self.name)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.name
|
||||||
|
|
||||||
|
|
||||||
class Group(object):
|
class Group(object):
|
||||||
"""
|
"""
|
||||||
|
@ -66,13 +71,19 @@ class Group(object):
|
||||||
self.channel_backend = channel_backends[alias]
|
self.channel_backend = channel_backends[alias]
|
||||||
|
|
||||||
def add(self, channel):
|
def add(self, channel):
|
||||||
|
if isinstance(channel, Channel):
|
||||||
|
channel = channel.name
|
||||||
self.channel_backend.group_add(self.name, channel)
|
self.channel_backend.group_add(self.name, channel)
|
||||||
|
|
||||||
def discard(self, channel):
|
def discard(self, channel):
|
||||||
|
if isinstance(channel, Channel):
|
||||||
|
channel = channel.name
|
||||||
self.channel_backend.group_discard(self.name, channel)
|
self.channel_backend.group_discard(self.name, channel)
|
||||||
|
|
||||||
def channels(self):
|
def channels(self):
|
||||||
self.channel_backend.group_channels(self.name)
|
self.channel_backend.group_channels(self.name)
|
||||||
|
|
||||||
def send(self, **kwargs):
|
def send(self, content):
|
||||||
self.channel_backend.send_group(self.name, kwargs)
|
if not isinstance(content, dict):
|
||||||
|
raise ValueError("You can only send dicts as content on channels.")
|
||||||
|
self.channel_backend.send_group(self.name, content)
|
||||||
|
|
|
@ -25,27 +25,27 @@ def http_session(func):
|
||||||
be None, rather than an empty session you can write to.
|
be None, rather than an empty session you can write to.
|
||||||
"""
|
"""
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def inner(*args, **kwargs):
|
def inner(message, *args, **kwargs):
|
||||||
if "COOKIES" not in kwargs and "GET" not in kwargs:
|
if "COOKIES" not in message.content and "GET" not in message.content:
|
||||||
raise ValueError("No COOKIES or GET sent to consumer; this decorator can only be used on messages containing at least one.")
|
raise ValueError("No COOKIES or GET sent to consumer; this decorator can only be used on messages containing at least one.")
|
||||||
# Make sure there's a session key
|
# Make sure there's a session key
|
||||||
session_key = None
|
session_key = None
|
||||||
if "GET" in kwargs:
|
if "GET" in message.content:
|
||||||
try:
|
try:
|
||||||
session_key = kwargs['GET'].get("session_key", [])[0]
|
session_key = message.content['GET'].get("session_key", [])[0]
|
||||||
except IndexError:
|
except IndexError:
|
||||||
pass
|
pass
|
||||||
if "COOKIES" in kwargs and session_key is None:
|
if "COOKIES" in message.content and session_key is None:
|
||||||
session_key = kwargs['COOKIES'].get(settings.SESSION_COOKIE_NAME)
|
session_key = message.content['COOKIES'].get(settings.SESSION_COOKIE_NAME)
|
||||||
# Make a session storage
|
# Make a session storage
|
||||||
if session_key:
|
if session_key:
|
||||||
session_engine = import_module(settings.SESSION_ENGINE)
|
session_engine = import_module(settings.SESSION_ENGINE)
|
||||||
session = session_engine.SessionStore(session_key=session_key)
|
session = session_engine.SessionStore(session_key=session_key)
|
||||||
else:
|
else:
|
||||||
session = None
|
session = None
|
||||||
kwargs['session'] = session
|
message.session = session
|
||||||
# Run the consumer
|
# Run the consumer
|
||||||
result = func(*args, **kwargs)
|
result = func(message, *args, **kwargs)
|
||||||
# Persist session if needed (won't be saved if error happens)
|
# Persist session if needed (won't be saved if error happens)
|
||||||
if session is not None and session.modified:
|
if session is not None and session.modified:
|
||||||
session.save()
|
session.save()
|
||||||
|
@ -65,46 +65,49 @@ def http_django_auth(func):
|
||||||
"""
|
"""
|
||||||
@http_session
|
@http_session
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def inner(*args, **kwargs):
|
def inner(message, *args, **kwargs):
|
||||||
# If we didn't get a session, then we don't get a user
|
# If we didn't get a session, then we don't get a user
|
||||||
if kwargs['session'] is None:
|
if not hasattr(message, "session"):
|
||||||
kwargs['user'] = None
|
raise ValueError("Did not see a session to get auth from")
|
||||||
|
if message.session is None:
|
||||||
|
message.user = None
|
||||||
# Otherwise, be a bit naughty and make a fake Request with just
|
# Otherwise, be a bit naughty and make a fake Request with just
|
||||||
# a "session" attribute (later on, perhaps refactor contrib.auth to
|
# a "session" attribute (later on, perhaps refactor contrib.auth to
|
||||||
# pass around session rather than request)
|
# pass around session rather than request)
|
||||||
else:
|
else:
|
||||||
fake_request = type("FakeRequest", (object, ), {"session": kwargs['session']})
|
fake_request = type("FakeRequest", (object, ), {"session": message.session})
|
||||||
kwargs['user'] = auth.get_user(fake_request)
|
message.user = auth.get_user(fake_request)
|
||||||
# Run the consumer
|
# Run the consumer
|
||||||
return func(*args, **kwargs)
|
return func(message, *args, **kwargs)
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
|
|
||||||
def send_channel_session(func):
|
def channel_session(func):
|
||||||
"""
|
"""
|
||||||
Provides a session-like object called "channel_session" to consumers
|
Provides a session-like object called "channel_session" to consumers
|
||||||
as a message attribute that will auto-persist across consumers with
|
as a message attribute that will auto-persist across consumers with
|
||||||
the same incoming "send_channel" value.
|
the same incoming "reply_channel" value.
|
||||||
"""
|
"""
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def inner(*args, **kwargs):
|
def inner(message, *args, **kwargs):
|
||||||
# Make sure there's a send_channel in kwargs
|
# Make sure there's a reply_channel in kwargs
|
||||||
if "send_channel" not in kwargs:
|
if not message.reply_channel:
|
||||||
raise ValueError("No send_channel sent to consumer; this decorator can only be used on messages containing it.")
|
raise ValueError("No reply_channel sent to consumer; this decorator can only be used on messages containing it.")
|
||||||
# Turn the send_channel into a valid session key length thing.
|
# Turn the reply_channel into a valid session key length thing.
|
||||||
# We take the last 24 bytes verbatim, as these are the random section,
|
# We take the last 24 bytes verbatim, as these are the random section,
|
||||||
# and then hash the remaining ones onto the start, and add a prefix
|
# and then hash the remaining ones onto the start, and add a prefix
|
||||||
# TODO: See if there's a better way of doing this
|
# TODO: See if there's a better way of doing this
|
||||||
session_key = "skt" + hashlib.md5(kwargs['send_channel'][:-24]).hexdigest()[:8] + kwargs['send_channel'][-24:]
|
reply_name = message.reply_channel.name
|
||||||
|
session_key = "skt" + hashlib.md5(reply_name[:-24]).hexdigest()[:8] + reply_name[-24:]
|
||||||
# Make a session storage
|
# Make a session storage
|
||||||
session_engine = import_module(settings.SESSION_ENGINE)
|
session_engine = import_module(settings.SESSION_ENGINE)
|
||||||
session = session_engine.SessionStore(session_key=session_key)
|
session = session_engine.SessionStore(session_key=session_key)
|
||||||
# If the session does not already exist, save to force our session key to be valid
|
# If the session does not already exist, save to force our session key to be valid
|
||||||
if not session.exists(session.session_key):
|
if not session.exists(session.session_key):
|
||||||
session.save()
|
session.save()
|
||||||
kwargs['channel_session'] = session
|
message.channel_session = session
|
||||||
# Run the consumer
|
# Run the consumer
|
||||||
result = func(*args, **kwargs)
|
result = func(message, *args, **kwargs)
|
||||||
# Persist session if needed (won't be saved if error happens)
|
# Persist session if needed (won't be saved if error happens)
|
||||||
if session.modified:
|
if session.modified:
|
||||||
session.save()
|
session.save()
|
||||||
|
|
|
@ -23,30 +23,26 @@ class InterfaceProtocol(WebSocketServerProtocol):
|
||||||
|
|
||||||
def onOpen(self):
|
def onOpen(self):
|
||||||
# Make sending channel
|
# Make sending channel
|
||||||
self.send_channel = Channel.new_name("!django.websocket.send")
|
self.reply_channel = Channel.new_name("!django.websocket.send")
|
||||||
|
self.request_info["reply_channel"] = self.reply_channel
|
||||||
self.last_keepalive = time.time()
|
self.last_keepalive = time.time()
|
||||||
self.factory.protocols[self.send_channel] = self
|
self.factory.protocols[self.reply_channel] = self
|
||||||
# Send news that this channel is open
|
# Send news that this channel is open
|
||||||
Channel("django.websocket.connect").send(
|
Channel("django.websocket.connect").send(self.request_info)
|
||||||
send_channel = self.send_channel,
|
|
||||||
**self.request_info
|
|
||||||
)
|
|
||||||
|
|
||||||
def onMessage(self, payload, isBinary):
|
def onMessage(self, payload, isBinary):
|
||||||
if isBinary:
|
if isBinary:
|
||||||
Channel("django.websocket.receive").send(
|
Channel("django.websocket.receive").send(dict(
|
||||||
send_channel = self.send_channel,
|
self.request_info,
|
||||||
content = payload,
|
content = payload,
|
||||||
binary = True,
|
binary = True,
|
||||||
**self.request_info
|
))
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
Channel("django.websocket.receive").send(
|
Channel("django.websocket.receive").send(dict(
|
||||||
send_channel = self.send_channel,
|
self.request_info,
|
||||||
content = payload.decode("utf8"),
|
content = payload.decode("utf8"),
|
||||||
binary = False,
|
binary = False,
|
||||||
**self.request_info
|
))
|
||||||
)
|
|
||||||
|
|
||||||
def serverSend(self, content, binary=False, **kwargs):
|
def serverSend(self, content, binary=False, **kwargs):
|
||||||
"""
|
"""
|
||||||
|
@ -64,21 +60,15 @@ class InterfaceProtocol(WebSocketServerProtocol):
|
||||||
self.sendClose()
|
self.sendClose()
|
||||||
|
|
||||||
def onClose(self, wasClean, code, reason):
|
def onClose(self, wasClean, code, reason):
|
||||||
if hasattr(self, "send_channel"):
|
if hasattr(self, "reply_channel"):
|
||||||
del self.factory.protocols[self.send_channel]
|
del self.factory.protocols[self.reply_channel]
|
||||||
Channel("django.websocket.disconnect").send(
|
Channel("django.websocket.disconnect").send(self.request_info)
|
||||||
send_channel = self.send_channel,
|
|
||||||
**self.request_info
|
|
||||||
)
|
|
||||||
|
|
||||||
def sendKeepalive(self):
|
def sendKeepalive(self):
|
||||||
"""
|
"""
|
||||||
Sends a keepalive packet on the keepalive channel.
|
Sends a keepalive packet on the keepalive channel.
|
||||||
"""
|
"""
|
||||||
Channel("django.websocket.keepalive").send(
|
Channel("django.websocket.keepalive").send(self.request_info)
|
||||||
send_channel = self.send_channel,
|
|
||||||
**self.request_info
|
|
||||||
)
|
|
||||||
self.last_keepalive = time.time()
|
self.last_keepalive = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
@ -94,7 +84,7 @@ class InterfaceFactory(WebSocketServerFactory):
|
||||||
super(InterfaceFactory, self).__init__(*args, **kwargs)
|
super(InterfaceFactory, self).__init__(*args, **kwargs)
|
||||||
self.protocols = {}
|
self.protocols = {}
|
||||||
|
|
||||||
def send_channels(self):
|
def reply_channels(self):
|
||||||
return self.protocols.keys()
|
return self.protocols.keys()
|
||||||
|
|
||||||
def dispatch_send(self, channel, message):
|
def dispatch_send(self, channel, message):
|
||||||
|
@ -128,7 +118,7 @@ class WebsocketTwistedInterface(object):
|
||||||
Run in a separate thread; reads messages from the backend.
|
Run in a separate thread; reads messages from the backend.
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
channels = self.factory.send_channels()
|
channels = self.factory.reply_channels()
|
||||||
# Quit if reactor is stopping
|
# Quit if reactor is stopping
|
||||||
if not reactor.running:
|
if not reactor.running:
|
||||||
return
|
return
|
||||||
|
|
|
@ -15,7 +15,7 @@ class WSGIInterface(WSGIHandler):
|
||||||
super(WSGIInterface, self).__init__(*args, **kwargs)
|
super(WSGIInterface, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
def get_response(self, request):
|
def get_response(self, request):
|
||||||
request.response_channel = Channel.new_name("django.wsgi.response")
|
request.reply_channel = Channel.new_name("django.wsgi.response")
|
||||||
Channel("django.wsgi.request", channel_backend=self.channel_backend).send(**request.channel_encode())
|
Channel("django.wsgi.request", channel_backend=self.channel_backend).send(request.channel_encode())
|
||||||
channel, message = self.channel_backend.receive_many_blocking([request.response_channel])
|
channel, message = self.channel_backend.receive_many_blocking([request.reply_channel])
|
||||||
return HttpResponse.channel_decode(message)
|
return HttpResponse.channel_decode(message)
|
||||||
|
|
18
channels/message.py
Normal file
18
channels/message.py
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
from .channel import Channel
|
||||||
|
|
||||||
|
|
||||||
|
class Message(object):
|
||||||
|
"""
|
||||||
|
Represents a message sent over a Channel.
|
||||||
|
|
||||||
|
The message content is a dict called .content, while
|
||||||
|
reply_channel is an optional extra attribute representing a channel
|
||||||
|
to use to reply to this message's end user, if that makes sense.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, content, channel, channel_backend, reply_channel=None):
|
||||||
|
self.content = content
|
||||||
|
self.channel = channel
|
||||||
|
self.channel_backend = channel_backend
|
||||||
|
if reply_channel:
|
||||||
|
self.reply_channel = Channel(reply_channel, channel_backend=self.channel_backend)
|
|
@ -17,7 +17,7 @@ def encode_request(request):
|
||||||
"path": request.path,
|
"path": request.path,
|
||||||
"path_info": request.path_info,
|
"path_info": request.path_info,
|
||||||
"method": request.method,
|
"method": request.method,
|
||||||
"response_channel": request.response_channel,
|
"reply_channel": request.reply_channel,
|
||||||
}
|
}
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ def decode_request(value):
|
||||||
request.path = value['path']
|
request.path = value['path']
|
||||||
request.method = value['method']
|
request.method = value['method']
|
||||||
request.path_info = value['path_info']
|
request.path_info = value['path_info']
|
||||||
request.response_channel = value['response_channel']
|
request.reply_channel = value['reply_channel']
|
||||||
return request
|
return request
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import traceback
|
import traceback
|
||||||
|
from .message import Message
|
||||||
|
|
||||||
|
|
||||||
class Worker(object):
|
class Worker(object):
|
||||||
|
@ -17,12 +18,18 @@ class Worker(object):
|
||||||
"""
|
"""
|
||||||
channels = self.channel_backend.registry.all_channel_names()
|
channels = self.channel_backend.registry.all_channel_names()
|
||||||
while True:
|
while True:
|
||||||
channel, message = self.channel_backend.receive_many_blocking(channels)
|
channel, content = self.channel_backend.receive_many_blocking(channels)
|
||||||
|
message = Message(
|
||||||
|
content=content,
|
||||||
|
channel=channel,
|
||||||
|
channel_backend=self.channel_backend,
|
||||||
|
reply_channel=content.get("reply_channel", None),
|
||||||
|
)
|
||||||
# Handle the message
|
# Handle the message
|
||||||
consumer = self.channel_backend.registry.consumer_for_channel(channel)
|
consumer = self.channel_backend.registry.consumer_for_channel(channel)
|
||||||
if self.callback:
|
if self.callback:
|
||||||
self.callback(channel, message)
|
self.callback(channel, message)
|
||||||
try:
|
try:
|
||||||
consumer(channel=channel, **message)
|
consumer(message)
|
||||||
except:
|
except:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
|
@ -100,17 +100,17 @@ message and can write out zero to many other channel messages.
|
||||||
|
|
||||||
Now, let's make a channel for requests (called ``django.wsgi.request``),
|
Now, let's make a channel for requests (called ``django.wsgi.request``),
|
||||||
and a channel per client for responses (e.g. ``django.wsgi.response.o4F2h2Fd``),
|
and a channel per client for responses (e.g. ``django.wsgi.response.o4F2h2Fd``),
|
||||||
with the response channel a property (``send_channel``) of the request message.
|
with the response channel a property (``reply_channel``) of the request message.
|
||||||
Suddenly, a view is merely another example of a consumer::
|
Suddenly, a view is merely another example of a consumer::
|
||||||
|
|
||||||
@consumer("django.wsgi.request")
|
@consumer("django.wsgi.request")
|
||||||
def my_consumer(send_channel, **request_data):
|
def my_consumer(reply_channel, **request_data):
|
||||||
# Decode the request from JSON-compat to a full object
|
# Decode the request from JSON-compat to a full object
|
||||||
django_request = Request.decode(request_data)
|
django_request = Request.decode(request_data)
|
||||||
# Run view
|
# Run view
|
||||||
django_response = view(django_request)
|
django_response = view(django_request)
|
||||||
# Encode the response into JSON-compat format
|
# Encode the response into JSON-compat format
|
||||||
Channel(send_channel).send(django_response.encode())
|
Channel(reply_channel).send(django_response.encode())
|
||||||
|
|
||||||
In fact, this is how Channels works. The interface servers transform connections
|
In fact, this is how Channels works. The interface servers transform connections
|
||||||
from the outside world (HTTP, WebSockets, etc.) into messages on channels,
|
from the outside world (HTTP, WebSockets, etc.) into messages on channels,
|
||||||
|
@ -177,16 +177,16 @@ set of channels (here, using Redis) to send updates to::
|
||||||
@receiver(post_save, sender=BlogUpdate)
|
@receiver(post_save, sender=BlogUpdate)
|
||||||
def send_update(sender, instance, **kwargs):
|
def send_update(sender, instance, **kwargs):
|
||||||
# Loop through all response channels and send the update
|
# Loop through all response channels and send the update
|
||||||
for send_channel in redis_conn.smembers("readers"):
|
for reply_channel in redis_conn.smembers("readers"):
|
||||||
Channel(send_channel).send(
|
Channel(reply_channel).send(
|
||||||
id=instance.id,
|
id=instance.id,
|
||||||
content=instance.content,
|
content=instance.content,
|
||||||
)
|
)
|
||||||
|
|
||||||
@consumer("django.websocket.connect")
|
@consumer("django.websocket.connect")
|
||||||
def ws_connect(path, send_channel, **kwargs):
|
def ws_connect(path, reply_channel, **kwargs):
|
||||||
# Add to reader set
|
# Add to reader set
|
||||||
redis_conn.sadd("readers", send_channel)
|
redis_conn.sadd("readers", reply_channel)
|
||||||
|
|
||||||
While this will work, there's a small problem - we never remove people from
|
While this will work, there's a small problem - we never remove people from
|
||||||
the ``readers`` set when they disconnect. We could add a consumer that
|
the ``readers`` set when they disconnect. We could add a consumer that
|
||||||
|
@ -221,9 +221,9 @@ we don't need to; Channels has it built in, as a feature called Groups::
|
||||||
|
|
||||||
@consumer("django.websocket.connect")
|
@consumer("django.websocket.connect")
|
||||||
@consumer("django.websocket.keepalive")
|
@consumer("django.websocket.keepalive")
|
||||||
def ws_connect(path, send_channel, **kwargs):
|
def ws_connect(path, reply_channel, **kwargs):
|
||||||
# Add to reader group
|
# Add to reader group
|
||||||
Group("liveblog").add(send_channel)
|
Group("liveblog").add(reply_channel)
|
||||||
|
|
||||||
Not only do groups have their own ``send()`` method (which backends can provide
|
Not only do groups have their own ``send()`` method (which backends can provide
|
||||||
an efficient implementation of), they also automatically manage expiry of
|
an efficient implementation of), they also automatically manage expiry of
|
||||||
|
|
|
@ -26,16 +26,16 @@ Make a new project, a new app, and put this in a ``consumers.py`` file in the ap
|
||||||
from channels import Channel
|
from channels import Channel
|
||||||
from django.http import HttpResponse
|
from django.http import HttpResponse
|
||||||
|
|
||||||
def http_consumer(response_channel, path, **kwargs):
|
def http_consumer(message):
|
||||||
response = HttpResponse("Hello world! You asked for %s" % path)
|
response = HttpResponse("Hello world! You asked for %s" % message.content['path'])
|
||||||
Channel(response_channel).send(**response.channel_encode())
|
message.reply_channel.send(response.channel_encode())
|
||||||
|
|
||||||
The most important thing to note here is that, because things we send in
|
The most important thing to note here is that, because things we send in
|
||||||
messages must be JSON-serialisable, the request and response messages
|
messages must be JSON-serialisable, the request and response messages
|
||||||
are in a key-value format. There are ``channel_decode()`` and
|
are in a key-value format. There are ``channel_decode()`` and
|
||||||
``channel_encode()`` methods on both Django's request and response classes,
|
``channel_encode()`` methods on both Django's request and response classes,
|
||||||
but here we just take two of the request variables directly as keyword
|
but here we just use the message's ``content`` attribute directly for simplicity
|
||||||
arguments for simplicity.
|
(message content is always a dict).
|
||||||
|
|
||||||
Now, go into your ``settings.py`` file, and set up a channel backend; by default,
|
Now, go into your ``settings.py`` file, and set up a channel backend; by default,
|
||||||
Django will just use a local backend and route HTTP requests to the normal
|
Django will just use a local backend and route HTTP requests to the normal
|
||||||
|
@ -72,8 +72,8 @@ do any time. Let's try some WebSockets, and make a basic chat server!
|
||||||
Delete that consumer and its routing - we'll want the normal Django view layer to
|
Delete that consumer and its routing - we'll want the normal Django view layer to
|
||||||
serve HTTP requests from now on - and make this WebSocket consumer instead::
|
serve HTTP requests from now on - and make this WebSocket consumer instead::
|
||||||
|
|
||||||
def ws_add(channel, send_channel, **kwargs):
|
def ws_add(message):
|
||||||
Group("chat").add(send_channel)
|
Group("chat").add(message.reply_channel)
|
||||||
|
|
||||||
Hook it up to the ``django.websocket.connect`` channel like this::
|
Hook it up to the ``django.websocket.connect`` channel like this::
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ Now, let's look at what this is doing. It's tied to the
|
||||||
``django.websocket.connect`` channel, which means that it'll get a message
|
``django.websocket.connect`` channel, which means that it'll get a message
|
||||||
whenever a new WebSocket connection is opened by a client.
|
whenever a new WebSocket connection is opened by a client.
|
||||||
|
|
||||||
When it gets that message, it takes the ``send_channel`` key from it, which
|
When it gets that message, it takes the ``reply_channel`` attribute from it, which
|
||||||
is the unique response channel for that client, and adds it to the ``chat``
|
is the unique response channel for that client, and adds it to the ``chat``
|
||||||
group, which means we can send messages to all connected chat clients.
|
group, which means we can send messages to all connected chat clients.
|
||||||
|
|
||||||
|
@ -107,8 +107,8 @@ a group it's already in - similarly, it's safe to discard a channel from a
|
||||||
group it's not in)::
|
group it's not in)::
|
||||||
|
|
||||||
# Connected to django.websocket.keepalive
|
# Connected to django.websocket.keepalive
|
||||||
def ws_keepalive(channel, send_channel, **kwargs):
|
def ws_keepalive(message):
|
||||||
Group("chat").add(send_channel)
|
Group("chat").add(message.reply_channel)
|
||||||
|
|
||||||
Of course, this is exactly the same code as the ``connect`` handler, so let's
|
Of course, this is exactly the same code as the ``connect`` handler, so let's
|
||||||
just route both channels to the same consumer::
|
just route both channels to the same consumer::
|
||||||
|
@ -125,8 +125,8 @@ handler to clean up as people disconnect (most channels will cleanly disconnect
|
||||||
and get this called)::
|
and get this called)::
|
||||||
|
|
||||||
# Connected to django.websocket.disconnect
|
# Connected to django.websocket.disconnect
|
||||||
def ws_disconnect(channel, send_channel, **kwargs):
|
def ws_disconnect(message):
|
||||||
Group("chat").discard(send_channel)
|
Group("chat").discard(message.reply_channel)
|
||||||
|
|
||||||
Now, that's taken care of adding and removing WebSocket send channels for the
|
Now, that's taken care of adding and removing WebSocket send channels for the
|
||||||
``chat`` group; all we need to do now is take care of message sending. For now,
|
``chat`` group; all we need to do now is take care of message sending. For now,
|
||||||
|
@ -136,16 +136,16 @@ any message sent in to all connected clients. Here's all the code::
|
||||||
from channels import Channel, Group
|
from channels import Channel, Group
|
||||||
|
|
||||||
# Connected to django.websocket.connect and django.websocket.keepalive
|
# Connected to django.websocket.connect and django.websocket.keepalive
|
||||||
def ws_add(channel, send_channel, **kwargs):
|
def ws_add(message):
|
||||||
Group("chat").add(send_channel)
|
Group("chat").add(message.reply_channel)
|
||||||
|
|
||||||
# Connected to django.websocket.receive
|
# Connected to django.websocket.receive
|
||||||
def ws_message(channel, send_channel, content, **kwargs):
|
def ws_message(message):
|
||||||
Group("chat").send(content=content)
|
Group("chat").send(message.content)
|
||||||
|
|
||||||
# Connected to django.websocket.disconnect
|
# Connected to django.websocket.disconnect
|
||||||
def ws_disconnect(channel, send_channel, **kwargs):
|
def ws_disconnect(message):
|
||||||
Group("chat").discard(send_channel)
|
Group("chat").discard(message.reply_channel)
|
||||||
|
|
||||||
And what our routing should look like in ``settings.py``::
|
And what our routing should look like in ``settings.py``::
|
||||||
|
|
||||||
|
@ -264,7 +264,7 @@ is much more portable, and works in development where you need to run a separate
|
||||||
WebSocket server (by default, on port 9000).
|
WebSocket server (by default, on port 9000).
|
||||||
|
|
||||||
All we need to do is add the ``django_http_auth`` decorator to our views,
|
All we need to do is add the ``django_http_auth`` decorator to our views,
|
||||||
and we'll get extra ``session`` and ``user`` keyword arguments we can use;
|
and we'll get extra ``session`` and ``user`` keyword attributes on ``message`` we can use;
|
||||||
let's make one where users can only chat to people with the same first letter
|
let's make one where users can only chat to people with the same first letter
|
||||||
of their username::
|
of their username::
|
||||||
|
|
||||||
|
@ -272,16 +272,16 @@ of their username::
|
||||||
from channels.decorators import django_http_auth
|
from channels.decorators import django_http_auth
|
||||||
|
|
||||||
@django_http_auth
|
@django_http_auth
|
||||||
def ws_add(channel, send_channel, user, **kwargs):
|
def ws_add(message):
|
||||||
Group("chat-%s" % user.username[0]).add(send_channel)
|
Group("chat-%s" % message.user.username[0]).add(message.reply_channel)
|
||||||
|
|
||||||
@django_http_auth
|
@django_http_auth
|
||||||
def ws_message(channel, send_channel, content, user, **kwargs):
|
def ws_message(message):
|
||||||
Group("chat-%s" % user.username[0]).send(content=content)
|
Group("chat-%s" % message.user.username[0]).send(message.content)
|
||||||
|
|
||||||
@django_http_auth
|
@django_http_auth
|
||||||
def ws_disconnect(channel, send_channel, user, **kwargs):
|
def ws_disconnect(message):
|
||||||
Group("chat-%s" % user.username[0]).discard(send_channel)
|
Group("chat-%s" % message.user.username[0]).discard(message.reply_channel)
|
||||||
|
|
||||||
Now, when we connect to the WebSocket we'll have to remember to provide the
|
Now, when we connect to the WebSocket we'll have to remember to provide the
|
||||||
Django session ID as part of the URL, like this::
|
Django session ID as part of the URL, like this::
|
||||||
|
@ -293,10 +293,6 @@ Note that Channels can't work with signed cookie sessions - since only HTTP
|
||||||
responses can set cookies, it needs a backend it can write to separately to
|
responses can set cookies, it needs a backend it can write to separately to
|
||||||
store state.
|
store state.
|
||||||
|
|
||||||
(Also note that we always end consumers with ``**kwargs``; this is to save us
|
|
||||||
from writing out all variables we might get sent and to allow forwards-compatibility
|
|
||||||
with any additions to the message formats in the future.)
|
|
||||||
|
|
||||||
Persisting Data
|
Persisting Data
|
||||||
---------------
|
---------------
|
||||||
|
|
||||||
|
@ -307,7 +303,7 @@ should let them send this request in the initial WebSocket connection,
|
||||||
check they're allowed to access it, and then remember which room a socket is
|
check they're allowed to access it, and then remember which room a socket is
|
||||||
connected to when they send a message in so we know which group to send it to.
|
connected to when they send a message in so we know which group to send it to.
|
||||||
|
|
||||||
The ``send_channel`` is our unique pointer to the open WebSocket - as you've
|
The ``reply_channel`` is our unique pointer to the open WebSocket - as you've
|
||||||
seen, we do all our operations on it - but it's not something we can annotate
|
seen, we do all our operations on it - but it's not something we can annotate
|
||||||
with data; it's just a simple string, and even if we hack around and set
|
with data; it's just a simple string, and even if we hack around and set
|
||||||
attributes on it that's not going to carry over to other workers.
|
attributes on it that's not going to carry over to other workers.
|
||||||
|
@ -315,18 +311,18 @@ attributes on it that's not going to carry over to other workers.
|
||||||
Instead, the solution is to persist information keyed by the send channel in
|
Instead, the solution is to persist information keyed by the send channel in
|
||||||
some other data store - sound familiar? This is what Django's session framework
|
some other data store - sound familiar? This is what Django's session framework
|
||||||
does for HTTP requests, only there it uses cookies as the lookup key rather
|
does for HTTP requests, only there it uses cookies as the lookup key rather
|
||||||
than the ``send_channel``.
|
than the ``reply_channel``.
|
||||||
|
|
||||||
Now, as you saw above, you can use the ``django_http_auth`` decorator to get
|
Now, as you saw above, you can use the ``django_http_auth`` decorator to get
|
||||||
both a ``user`` and a ``session`` variable in your message arguments - and,
|
both a ``user`` and a ``session`` attribute on your message - and,
|
||||||
indeed, there is a ``websocket_session`` decorator that will just give you
|
indeed, there is a ``http_session`` decorator that will just give you
|
||||||
the ``session`` attribute.
|
the ``session`` attribute.
|
||||||
|
|
||||||
However, that session is based on cookies, and so follows the user round the
|
However, that session is based on cookies, and so follows the user round the
|
||||||
site - it's great for information that should persist across all WebSocket and
|
site - it's great for information that should persist across all WebSocket and
|
||||||
HTTP connections, but not great for information that is specific to a single
|
HTTP connections, but not great for information that is specific to a single
|
||||||
WebSocket (such as "which chatroom should this socket be connected to"). For
|
WebSocket (such as "which chatroom should this socket be connected to"). For
|
||||||
this reason, Channels also provides a ``send_channel_session`` decorator,
|
this reason, Channels also provides a ``channel_session`` decorator,
|
||||||
which adds a ``channel_session`` attribute to the message; this works just like
|
which adds a ``channel_session`` attribute to the message; this works just like
|
||||||
the normal ``session`` attribute, and persists to the same storage, but varies
|
the normal ``session`` attribute, and persists to the same storage, but varies
|
||||||
per-channel rather than per-cookie.
|
per-channel rather than per-cookie.
|
||||||
|
@ -335,31 +331,31 @@ Let's use it now to build a chat server that expects you to pass a chatroom
|
||||||
name in the path of your WebSocket request (we'll ignore auth for now)::
|
name in the path of your WebSocket request (we'll ignore auth for now)::
|
||||||
|
|
||||||
from channels import Channel
|
from channels import Channel
|
||||||
from channels.decorators import consumer, send_channel_session
|
from channels.decorators import consumer, channel_session
|
||||||
|
|
||||||
@consumer("django.websocket.connect")
|
@consumer("django.websocket.connect")
|
||||||
@send_channel_session
|
@channel_session
|
||||||
def ws_connect(channel, send_channel, path, channel_session, **kwargs):
|
def ws_connect(message):
|
||||||
# Work out room name from path (ignore slashes)
|
# Work out room name from path (ignore slashes)
|
||||||
room = path.strip("/")
|
room = message.content['path'].strip("/")
|
||||||
# Save room in session and add us to the group
|
# Save room in session and add us to the group
|
||||||
channel_session['room'] = room
|
message.channel_session['room'] = room
|
||||||
Group("chat-%s" % room).add(send_channel)
|
Group("chat-%s" % room).add(message.reply_channel)
|
||||||
|
|
||||||
@consumer("django.websocket.keepalive")
|
@consumer("django.websocket.keepalive")
|
||||||
@send_channel_session
|
@channel_session
|
||||||
def ws_add(channel, send_channel, channel_session, **kwargs):
|
def ws_add(message):
|
||||||
Group("chat-%s" % channel_session['room']).add(send_channel)
|
Group("chat-%s" % message.channel_session['room']).add(message.reply_channel)
|
||||||
|
|
||||||
@consumer("django.websocket.receive")
|
@consumer("django.websocket.receive")
|
||||||
@send_channel_session
|
@channel_session
|
||||||
def ws_message(channel, send_channel, content, channel_session, **kwargs):
|
def ws_message(message):
|
||||||
Group("chat-%s" % channel_session['room']).send(content=content)
|
Group("chat-%s" % message.channel_session['room']).send(content)
|
||||||
|
|
||||||
@consumer("django.websocket.disconnect")
|
@consumer("django.websocket.disconnect")
|
||||||
@send_channel_session
|
@channel_session
|
||||||
def ws_disconnect(channel, send_channel, channel_session, **kwargs):
|
def ws_disconnect(message):
|
||||||
Group("chat-%s" % channel_session['room']).discard(send_channel)
|
Group("chat-%s" % message.channel_session['room']).discard(message.reply_channel)
|
||||||
|
|
||||||
If you play around with it from the console (or start building a simple
|
If you play around with it from the console (or start building a simple
|
||||||
JavaScript chat client that appends received messages to a div), you'll see
|
JavaScript chat client that appends received messages to a div), you'll see
|
||||||
|
@ -391,40 +387,48 @@ Let's see what that looks like, assuming we
|
||||||
have a ChatMessage model with ``message`` and ``room`` fields::
|
have a ChatMessage model with ``message`` and ``room`` fields::
|
||||||
|
|
||||||
from channels import Channel
|
from channels import Channel
|
||||||
from channels.decorators import consumer, send_channel_session
|
from channels.decorators import consumer, channel_session
|
||||||
from .models import ChatMessage
|
from .models import ChatMessage
|
||||||
|
|
||||||
@consumer("chat-messages")
|
@consumer("chat-messages")
|
||||||
def msg_consumer(channel, room, message):
|
def msg_consumer(message):
|
||||||
# Save to model
|
# Save to model
|
||||||
ChatMessage.objects.create(room=room, message=message)
|
ChatMessage.objects.create(
|
||||||
|
room=message.content['room'],
|
||||||
|
message=message.content['message'],
|
||||||
|
)
|
||||||
# Broadcast to listening sockets
|
# Broadcast to listening sockets
|
||||||
Group("chat-%s" % room).send(message)
|
Group("chat-%s" % room).send({
|
||||||
|
"content": message.content['message'],
|
||||||
|
})
|
||||||
|
|
||||||
@consumer("django.websocket.connect")
|
@consumer("django.websocket.connect")
|
||||||
@send_channel_session
|
@channel_session
|
||||||
def ws_connect(channel, send_channel, path, channel_session, **kwargs):
|
def ws_connect(message):
|
||||||
# Work out room name from path (ignore slashes)
|
# Work out room name from path (ignore slashes)
|
||||||
room = path.strip("/")
|
room = message.content['path'].strip("/")
|
||||||
# Save room in session and add us to the group
|
# Save room in session and add us to the group
|
||||||
channel_session['room'] = room
|
message.channel_session['room'] = room
|
||||||
Group("chat-%s" % room).add(send_channel)
|
Group("chat-%s" % room).add(message.reply_channel)
|
||||||
|
|
||||||
@consumer("django.websocket.keepalive")
|
@consumer("django.websocket.keepalive")
|
||||||
@send_channel_session
|
@channel_session
|
||||||
def ws_add(channel, send_channel, channel_session, **kwargs):
|
def ws_add(message):
|
||||||
Group("chat-%s" % channel_session['room']).add(send_channel)
|
Group("chat-%s" % message.channel_session['room']).add(message.reply_channel)
|
||||||
|
|
||||||
@consumer("django.websocket.receive")
|
@consumer("django.websocket.receive")
|
||||||
@send_channel_session
|
@channel_session
|
||||||
def ws_message(channel, send_channel, content, channel_session, **kwargs):
|
def ws_message(message):
|
||||||
# Stick the message onto the processing queue
|
# Stick the message onto the processing queue
|
||||||
Channel("chat-messages").send(room=channel_session['room'], message=content)
|
Channel("chat-messages").send({
|
||||||
|
"room": channel_session['room'],
|
||||||
|
"message": content,
|
||||||
|
})
|
||||||
|
|
||||||
@consumer("django.websocket.disconnect")
|
@consumer("django.websocket.disconnect")
|
||||||
@send_channel_session
|
@channel_session
|
||||||
def ws_disconnect(channel, send_channel, channel_session, **kwargs):
|
def ws_disconnect(message):
|
||||||
Group("chat-%s" % channel_session['room']).discard(send_channel)
|
Group("chat-%s" % message.channel_session['room']).discard(message.reply_channel)
|
||||||
|
|
||||||
Note that we could add messages onto the ``chat-messages`` channel from anywhere;
|
Note that we could add messages onto the ``chat-messages`` channel from anywhere;
|
||||||
inside a View, inside another model's ``post_save`` signal, inside a management
|
inside a View, inside another model's ``post_save`` signal, inside a management
|
||||||
|
|
|
@ -62,7 +62,7 @@ Contains the following keys:
|
||||||
* META: Same as ``request.META``
|
* META: Same as ``request.META``
|
||||||
* path: Same as ``request.path``
|
* path: Same as ``request.path``
|
||||||
* path_info: Same as ``request.path_info``
|
* path_info: Same as ``request.path_info``
|
||||||
* send_channel: Channel name to send responses on
|
* reply_channel: Channel name to send responses on
|
||||||
|
|
||||||
|
|
||||||
WebSocket Receive
|
WebSocket Receive
|
||||||
|
@ -81,7 +81,7 @@ WebSocket Client Close
|
||||||
|
|
||||||
Sent when the WebSocket is closed by either the client or the server.
|
Sent when the WebSocket is closed by either the client or the server.
|
||||||
|
|
||||||
Contains the same keys as WebSocket Connection, including send_channel,
|
Contains the same keys as WebSocket Connection, including reply_channel,
|
||||||
though nothing should be sent on it.
|
though nothing should be sent on it.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user