Merge remote-tracking branch 'upstream/master' into ticket_10

This commit is contained in:
Sean Mc Allister 2016-10-21 17:51:52 +02:00
commit e38c7541da
10 changed files with 226 additions and 65 deletions

View File

@ -1,3 +1,15 @@
0.15.0 (2016-08-28)
-------------------
* Connections now force-close themselves after pings fail for a certain
timeframe, controllable via the new --ping-timeout option.
* Badly-formatted websocket response messages now log to console in
all situations
* Compatability with Twisted 16.3 and up
0.14.3 (2016-07-21)
-------------------

View File

@ -1,14 +1,14 @@
daphne
======
.. image:: https://api.travis-ci.org/andrewgodwin/daphne.svg
:target: https://travis-ci.org/andrewgodwin/daphne
.. image:: https://api.travis-ci.org/django/daphne.svg
:target: https://travis-ci.org/django/daphne
.. image:: https://img.shields.io/pypi/v/daphne.svg
:target: https://pypi.python.org/pypi/daphne
Daphne is a HTTP, HTTP2 and WebSocket protocol server for
`ASGI <http://channels.readthedocs.org/en/latest/asgi.html>`_, and developed
`ASGI <https://channels.readthedocs.io/en/latest/asgi.html>`_, and developed
to power Django Channels.
It supports automatic negotiation of protocols; there's no need for URL
@ -66,3 +66,16 @@ The header takes precedence if both are set. As with ``SCRIPT_ALIAS``, the value
should start with a slash, but not end with one; for example::
daphne --root-path=/forum django_project.asgi:channel_layer
Maintenance and Security
------------------------
To report security issues, please contact security@djangoproject.com. For GPG
signatures and more security process information, see
https://docs.djangoproject.com/en/dev/internals/security/.
To report bugs or request new features, please open a new GitHub issue.
This repository is part of the Channels project. For the shepherd and maintenance team, please see the
`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.

View File

@ -1 +1 @@
__version__ = "0.14.3"
__version__ = "0.15.0"

View File

@ -24,6 +24,18 @@ class AccessLogGenerator(object):
length=details['size'],
)
# Websocket requests
elif protocol == "websocket" and action == "connecting":
self.write_entry(
host=details['client'],
date=datetime.datetime.now(),
request="WSCONNECTING %(path)s" % details,
)
elif protocol == "websocket" and action == "rejected":
self.write_entry(
host=details['client'],
date=datetime.datetime.now(),
request="WSREJECT %(path)s" % details,
)
elif protocol == "websocket" and action == "connected":
self.write_entry(
host=details['client'],

View File

@ -106,6 +106,7 @@ class CommandLineInterface(object):
help='The setting for the ASGI root_path variable',
default="",
)
self.server = None
@classmethod
def entrypoint(cls):
@ -123,12 +124,12 @@ class CommandLineInterface(object):
args = self.parser.parse_args(args)
# Set up logging
logging.basicConfig(
level = {
level={
0: logging.WARN,
1: logging.INFO,
2: logging.DEBUG,
}[args.verbosity],
format = "%(asctime)-15s %(levelname)-8s %(message)s" ,
format="%(asctime)-15s %(levelname)-8s %(message)s",
)
# If verbosity is 1 or greater, or they told us explicitly, set up access log
access_log_stream = None
@ -156,6 +157,11 @@ class CommandLineInterface(object):
args.host = DEFAULT_HOST
# Run server
logger.info(
"Starting server at %s, channel layer %s",
(args.unix_socket if args.unix_socket else "%s:%s" % (args.host, args.port)),
args.channel_layer,
)
self.server = Server(
channel_layer=channel_layer,
host=args.host,
@ -169,6 +175,7 @@ class CommandLineInterface(object):
action_logger=AccessLogGenerator(access_log_stream) if access_log_stream else None,
ws_protocols=args.ws_protocols,
root_path=args.root_path,
verbosity=args.verbosity,
)
self.server.run()

View File

@ -110,6 +110,10 @@ class WebRequest(http.Request):
logger.debug("Connection %s did not get successful WS handshake.", self.reply_channel)
del self.factory.reply_protocols[self.reply_channel]
self.reply_channel = None
# Resume the producer so we keep getting data, if it's available as a method
if hasattr(self.channel, "resumeProducing"):
self.channel.resumeProducing()
# Boring old HTTP.
else:
# Sanitize and decode headers, potentially extracting root path
@ -173,6 +177,7 @@ class WebRequest(http.Request):
try:
self.factory.channel_layer.send("http.disconnect", {
"reply_channel": self.reply_channel,
"path": self.unquote(self.path),
})
except self.factory.channel_layer.ChannelFull:
pass
@ -276,12 +281,13 @@ class HTTPFactory(http.HTTPFactory):
protocol = HTTPProtocol
def __init__(self, channel_layer, action_logger=None, timeout=120, websocket_timeout=86400, ping_interval=20, ping_timeout=30, ws_protocols=None, root_path=""):
def __init__(self, channel_layer, action_logger=None, timeout=120, websocket_timeout=86400, ping_interval=20, ping_timeout=30, ws_protocols=None, root_path="", websocket_connect_timeout=30):
http.HTTPFactory.__init__(self)
self.channel_layer = channel_layer
self.action_logger = action_logger
self.timeout = timeout
self.websocket_timeout = websocket_timeout
self.websocket_connect_timeout = websocket_connect_timeout
self.ping_interval = ping_interval
# We track all sub-protocols for response channel mapping
self.reply_protocols = {}
@ -299,21 +305,29 @@ class HTTPFactory(http.HTTPFactory):
if channel.startswith("http") and isinstance(self.reply_protocols[channel], WebRequest):
self.reply_protocols[channel].serverResponse(message)
elif channel.startswith("websocket") and isinstance(self.reply_protocols[channel], WebSocketProtocol):
# Ensure the message is a valid WebSocket one
unknown_message_keys = set(message.keys()) - {"bytes", "text", "close"}
if unknown_message_keys:
# Switch depending on current socket state
protocol = self.reply_protocols[channel]
# See if the message is valid
unknown_keys = set(message.keys()) - {"bytes", "text", "close", "accept"}
if unknown_keys:
raise ValueError(
"Got invalid WebSocket reply message on %s - contains unknown keys %s" % (
"Got invalid WebSocket reply message on %s - "
"contains unknown keys %s (looking for either {'accept', 'text', 'bytes', 'close'})" % (
channel,
unknown_message_keys,
)
)
if message.get("accept", None) and protocol.state == protocol.STATE_CONNECTING:
protocol.serverAccept()
if message.get("bytes", None):
self.reply_protocols[channel].serverSend(message["bytes"], True)
protocol.serverSend(message["bytes"], True)
if message.get("text", None):
self.reply_protocols[channel].serverSend(message["text"], False)
protocol.serverSend(message["text"], False)
if message.get("close", False):
self.reply_protocols[channel].serverClose()
if protocol.state == protocol.STATE_CONNECTING:
protocol.serverReject()
else:
protocol.serverClose()
else:
raise ValueError("Cannot dispatch message on channel %r" % channel)

View File

@ -1,12 +1,12 @@
import logging
import socket
from twisted.internet import reactor, defer
from twisted.logger import globalLogBeginner
from twisted.logger import globalLogBeginner, STDLibLogObserver
from twisted.internet.endpoints import serverFromString
from .http_protocol import HTTPFactory
logger = logging.getLogger(__name__)
@ -28,6 +28,7 @@ class Server(object):
ping_timeout=30,
ws_protocols=None,
root_path="",
verbosity=1
):
self.channel_layer = channel_layer
@ -46,12 +47,12 @@ class Server(object):
self.http_timeout = http_timeout
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
# If they did not provide a websocket timeout, default it to the
# channel layer's group_expiry value if present, or one day if not.
self.websocket_timeout = websocket_timeout or getattr(channel_layer, "group_expiry", 86400)
self.ws_protocols = ws_protocols
self.root_path = root_path
self.verbosity = verbosity
def run(self):
self.factory = HTTPFactory(
@ -64,9 +65,11 @@ class Server(object):
ws_protocols=self.ws_protocols,
root_path=self.root_path,
)
# Redirect the Twisted log to nowhere
globalLogBeginner.beginLoggingTo([lambda _: None], redirectStandardIO=False, discardBuffer=True)
if self.verbosity <= 1:
# Redirect the Twisted log to nowhere
globalLogBeginner.beginLoggingTo([lambda _: None], redirectStandardIO=False, discardBuffer=True)
else:
globalLogBeginner.beginLoggingTo([STDLibLogObserver(__name__)])
if "twisted" in self.channel_layer.extensions and False:
logger.info("Using native Twisted mode on channel layer")
@ -97,14 +100,19 @@ class Server(object):
# Don't do anything if there's no channels to listen on
if channels:
delay = 0.01
channel, message = self.channel_layer.receive_many(channels, block=False)
if channel:
delay = 0.00
# Deal with the message
try:
self.factory.dispatch_reply(channel, message)
except Exception as e:
logger.error("HTTP/WS send decode error: %s" % e)
try:
channel, message = self.channel_layer.receive_many(channels, block=False)
except Exception as e:
logger.error('Error at trying to receive messages: %s' % e)
delay = 5.00
else:
if channel:
delay = 0.00
# Deal with the message
try:
self.factory.dispatch_reply(channel, message)
except Exception as e:
logger.error("HTTP/WS send decode error: %s" % e)
reactor.callLater(delay, self.backend_reader_sync)
@defer.inlineCallbacks
@ -119,15 +127,20 @@ class Server(object):
return
channels = self.factory.reply_channels()
if channels:
channel, message = yield self.channel_layer.receive_many_twisted(channels)
# Deal with the message
if channel:
try:
self.factory.dispatch_reply(channel, message)
except Exception as e:
logger.error("HTTP/WS send decode error: %s" % e)
try:
channel, message = yield self.channel_layer.receive_many_twisted(channels)
except Exception as e:
logger.error('Error at trying to receive messages: %s' % e)
yield self.sleep(5.00)
else:
yield self.sleep(0.01)
# Deal with the message
if channel:
try:
self.factory.dispatch_reply(channel, message)
except Exception as e:
logger.error("HTTP/WS send decode error: %s" % e)
else:
yield self.sleep(0.01)
else:
yield self.sleep(0.05)

View File

@ -66,3 +66,30 @@ class TestHTTPProtocol(TestCase):
# Get the resulting message off of the channel layer, check root_path
_, message = self.channel_layer.receive_many(["http.request"])
self.assertEqual(message['root_path'], "/foobar /bar")
def test_http_disconnect_sets_path_key(self):
"""
Tests http disconnect has the path key set, see https://channels.readthedocs.io/en/latest/asgi.html#disconnect
"""
# Send a simple request to the protocol
self.proto.dataReceived(
b"GET /te%20st-%C3%A0/?foo=bar HTTP/1.1\r\n" +
b"Host: anywhere.com\r\n" +
b"\r\n"
)
# Get the request message
_, message = self.channel_layer.receive_many(["http.request"])
# Send back an example response
self.factory.dispatch_reply(
message['reply_channel'],
{
"status": 200,
"status_text": b"OK",
"content": b"DISCO",
}
)
# Get the disconnection notification
_, disconnect_message = self.channel_layer.receive_many(["http.disconnect"])
self.assertEqual(disconnect_message['path'], "/te st-à/")

View File

@ -5,8 +5,9 @@ import six
import time
import traceback
from six.moves.urllib_parse import unquote, urlencode
from twisted.internet import defer
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory, ConnectionDeny
logger = logging.getLogger(__name__)
@ -27,6 +28,7 @@ class WebSocketProtocol(WebSocketServerProtocol):
def onConnect(self, request):
self.request = request
self.packets_received = 0
self.protocol_to_accept = None
self.socket_opened = time.time()
self.last_data = time.time()
try:
@ -78,8 +80,31 @@ class WebSocketProtocol(WebSocketServerProtocol):
ws_protocol = protocol
break
# Work out what subprotocol we will accept, if any
if ws_protocol and ws_protocol in self.factory.protocols:
return ws_protocol
self.protocol_to_accept = ws_protocol
else:
self.protocol_to_accept = None
# Send over the connect message
try:
self.channel_layer.send("websocket.connect", self.request_info)
except self.channel_layer.ChannelFull:
# You have to consume websocket.connect according to the spec,
# so drop the connection.
self.muted = True
logger.warn("WebSocket force closed for %s due to connect backpressure", self.reply_channel)
# Send code 1013 "try again later" with close.
raise ConnectionDeny(code=503, reason="Connection queue at capacity")
else:
self.factory.log_action("websocket", "connecting", {
"path": self.request.path,
"client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,
})
# Make a deferred and return it - we'll either call it or err it later on
self.handshake_deferred = defer.Deferred()
return self.handshake_deferred
@classmethod
def unquote(cls, value):
@ -93,21 +118,11 @@ class WebSocketProtocol(WebSocketServerProtocol):
def onOpen(self):
# Send news that this channel is open
logger.debug("WebSocket open for %s", self.reply_channel)
try:
self.channel_layer.send("websocket.connect", self.request_info)
except self.channel_layer.ChannelFull:
# You have to consume websocket.connect according to the spec,
# so drop the connection.
self.muted = True
logger.warn("WebSocket force closed for %s due to connect backpressure", self.reply_channel)
# Send code 1013 "try again later" with close.
self.sendCloseFrame(code=1013, isReply=False)
else:
self.factory.log_action("websocket", "connected", {
"path": self.request.path,
"client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,
})
logger.debug("WebSocket %s open and established", self.reply_channel)
self.factory.log_action("websocket", "connected", {
"path": self.request.path,
"client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,
})
def onMessage(self, payload, isBinary):
# If we're muted, do nothing.
@ -140,10 +155,31 @@ class WebSocketProtocol(WebSocketServerProtocol):
# Send code 1013 "try again later" with close.
self.sendCloseFrame(code=1013, isReply=False)
def serverAccept(self):
"""
Called when we get a message saying to accept the connection.
"""
self.handshake_deferred.callback(self.protocol_to_accept)
logger.debug("WebSocket %s accepted by application", self.reply_channel)
def serverReject(self):
"""
Called when we get a message saying to accept the connection.
"""
self.handshake_deferred.errback(ConnectionDeny(code=403, reason="Access denied"))
self.cleanup()
logger.debug("WebSocket %s rejected by application", self.reply_channel)
self.factory.log_action("websocket", "rejected", {
"path": self.request.path,
"client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,
})
def serverSend(self, content, binary=False):
"""
Server-side channel message to send a message.
"""
if self.state == self.STATE_CONNECTING:
self.serverAccept()
self.last_data = time.time()
logger.debug("Sent WebSocket packet to client for %s", self.reply_channel)
if binary:
@ -158,9 +194,9 @@ class WebSocketProtocol(WebSocketServerProtocol):
self.sendClose()
def onClose(self, wasClean, code, reason):
self.cleanup()
if hasattr(self, "reply_channel"):
logger.debug("WebSocket closed for %s", self.reply_channel)
del self.factory.reply_protocols[self.reply_channel]
try:
if not self.muted:
self.channel_layer.send("websocket.disconnect", {
@ -178,6 +214,13 @@ class WebSocketProtocol(WebSocketServerProtocol):
else:
logger.debug("WebSocket closed before handshake established")
def cleanup(self):
"""
Call to clean up this socket after it's closed.
"""
if hasattr(self, "reply_channel"):
del self.factory.reply_protocols[self.reply_channel]
def duration(self):
"""
Returns the time since the socket was opened
@ -186,11 +229,16 @@ class WebSocketProtocol(WebSocketServerProtocol):
def check_ping(self):
"""
Checks to see if we should send a keepalive ping.
Checks to see if we should send a keepalive ping/deny socket connection
"""
if (time.time() - self.last_data) > self.main_factory.ping_interval:
self._sendAutoPing()
self.last_data = time.time()
# If we're still connecting, deny the connection
if self.state == self.STATE_CONNECTING:
if self.duration() > self.main_factory.websocket_connect_timeout:
self.serverReject()
elif self.state == self.STATE_OPEN:
if (time.time() - self.last_data) > self.main_factory.ping_interval:
self._sendAutoPing()
self.last_data = time.time()
class WebSocketFactory(WebSocketServerFactory):

View File

@ -1,20 +1,22 @@
import os
import sys
from setuptools import find_packages, setup
from daphne import __version__
# We use the README as the long_description
readme_path = os.path.join(os.path.dirname(__file__), "README.rst")
with open(readme_path) as fp:
long_description = fp.read()
setup(
name='daphne',
version=__version__,
url='http://www.djangoproject.com/',
url='https://github.com/django/daphne',
author='Django Software Foundation',
author_email='foundation@djangoproject.com',
description='Django ASGI (HTTP/WebSocket) server',
long_description=open(readme_path).read(),
long_description=long_description,
license='BSD',
zip_safe=False,
package_dir={'twisted': 'daphne/twisted'},
@ -22,10 +24,23 @@ setup(
include_package_data=True,
install_requires=[
'asgiref>=0.13',
'twisted>=15.5,<16.3',
'twisted>=16.0',
'autobahn>=0.12',
],
entry_points={'console_scripts': [
'daphne = daphne.cli:CommandLineInterface.entrypoint',
]},
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Web Environment',
'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Topic :: Internet :: WWW/HTTP',
],
)