Initial commit

This commit is contained in:
Andrew Godwin 2015-12-23 17:05:15 +00:00
commit 88d47df276
8 changed files with 466 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.egg-info

4
README.rst Normal file
View File

@ -0,0 +1,4 @@
daphne
======
Daphne is a HTTP, HTTP2 and WebSocket interface server for Django.

1
daphne/__init__.py Executable file
View File

@ -0,0 +1 @@
__version__ = "0.7"

87
daphne/cli.py Executable file
View File

@ -0,0 +1,87 @@
import sys
import argparse
import logging
import importlib
from .server import Server
logger = logging.getLogger(__name__)
class CommandLineInterface(object):
"""
Acts as the main CLI entry point for running the server.
"""
description = "Django HTTP/WebSocket server"
def __init__(self):
self.parser = argparse.ArgumentParser(
description=self.description,
)
self.parser.add_argument(
'-p',
'--port',
type=int,
help='Port number to listen on',
default=8000,
)
self.parser.add_argument(
'-b',
'--bind',
dest='host',
help='The host/address to bind to',
default="127.0.0.1",
)
self.parser.add_argument(
'-v',
'--verbosity',
type=int,
help='How verbose to make the output',
default=1,
)
self.parser.add_argument(
'channel_layer',
help='The ASGI channel layer instance to use as path.to.module:instance.path',
)
@classmethod
def entrypoint(cls):
"""
Main entrypoint for external starts.
"""
cls().run(sys.argv[1:])
def run(self, args):
"""
Pass in raw argument list and it will decode them
and run the server.
"""
# Decode args
args = self.parser.parse_args(args)
# Set up logging
logging.basicConfig(
level = {
0: logging.WARN,
1: logging.INFO,
2: logging.DEBUG,
}[args.verbosity],
format = "%(asctime)-15s %(levelname)-8s %(message)s" ,
)
# Import channel layer
module_path, object_path = args.channel_layer.split(":", 1)
channel_layer = importlib.import_module(module_path)
for bit in object_path.split("."):
channel_layer = getattr(channel_layer, bit)
# Run server
logger.info(
"Starting server on %s:%s, channel layer %s",
args.host,
args.port,
args.channel_layer,
)
Server(
channel_layer=channel_layer,
host=args.host,
port=args.port,
).run()

205
daphne/http_protocol.py Executable file
View File

@ -0,0 +1,205 @@
from __future__ import unicode_literals
import time
import logging
from twisted.python.compat import _PY3
from twisted.web import http
from twisted.protocols.policies import ProtocolWrapper
from .ws_protocol import WebSocketProtocol, WebSocketFactory
logger = logging.getLogger(__name__)
class WebRequest(http.Request):
"""
Request that either hands off information to channels, or offloads
to a WebSocket class.
Does some extra processing over the normal Twisted Web request to separate
GET and POST out.
"""
def __init__(self, *args, **kwargs):
http.Request.__init__(self, *args, **kwargs)
# Easy factory link
self.factory = self.channel.factory
# Make a name for our reply channel
self.reply_channel = self.factory.channel_layer.new_channel(b"!http.response.?")
# Tell factory we're that channel's client
self.last_keepalive = time.time()
self.factory.reply_protocols[self.reply_channel] = self
def process(self):
# Get upgrade header
upgrade_header = None
if self.requestHeaders.hasHeader("Upgrade"):
upgrade_header = self.requestHeaders.getRawHeaders("Upgrade")[0]
# Is it WebSocket? IS IT?!
if upgrade_header == "websocket":
# Make WebSocket protocol to hand off to
protocol = self.factory.ws_factory.buildProtocol(self.transport.getPeer())
if not protocol:
# If protocol creation fails, we signal "internal server error"
self.setResponseCode(500)
self.finish()
# Port across transport
transport, self.transport = self.transport, None
if isinstance(transport, ProtocolWrapper):
# i.e. TLS is a wrapping protocol
transport.wrappedProtocol = protocol
else:
transport.protocol = protocol
protocol.makeConnection(transport)
# Re-inject request
data = self.method + b' ' + self.uri + b' HTTP/1.1\x0d\x0a'
for h in self.requestHeaders.getAllRawHeaders():
data += h[0] + b': ' + b",".join(h[1]) + b'\x0d\x0a'
data += b"\x0d\x0a"
data += self.content.read()
protocol.dataReceived(data)
# Remove our HTTP reply channel association
logging.debug("Upgraded connection %s to WebSocket", self.reply_channel)
self.factory.reply_protocols[self.reply_channel] = None
self.reply_channel = None
# Boring old HTTP.
else:
# Send request message
logging.debug("HTTP %s request for %s", self.method, self.reply_channel)
self.factory.channel_layer.send(b"http.request", {
"reply_channel": self.reply_channel,
"method": self.method,
"get": self.get,
"post": self.post,
"cookies": self.received_cookies,
"headers": {k: v[0] for k, v in self.requestHeaders.getAllRawHeaders()},
"client": [self.client.host, self.client.port],
"server": [self.host.host, self.host.port],
"path": self.path,
})
def connectionLost(self, reason):
"""
Cleans up reply channel on close.
"""
if self.reply_channel:
del self.channel.factory.reply_protocols[self.reply_channel]
logging.debug("HTTP disconnect for %s", self.reply_channel)
http.Request.connectionLost(self, reason)
def serverResponse(self, message):
"""
Writes a received HTTP response back out to the transport.
"""
# Write code
self.setResponseCode(message['status'])
# Write headers
for header, value in message.get("headers", {}):
self.setHeader(header.encode("utf8"), value.encode("utf8"))
# Write cookies
for cookie in message.get("cookies"):
self.cookies.append(cookie.encode("utf8"))
# Write out body
if "content" in message:
http.Request.write(self, message['content'].encode("utf8"))
self.finish()
logging.debug("HTTP %s response for %s", message['status'], self.reply_channel)
def requestReceived(self, command, path, version):
"""
Called by channel when all data has been received.
Overridden because Twisted merges GET and POST into one thing by default.
"""
self.content.seek(0,0)
self.get = {}
self.post = {}
self.method, self.uri = command, path
self.clientproto = version
x = self.uri.split(b'?', 1)
# URI and GET args assignment
if len(x) == 1:
self.path = self.uri
else:
self.path, argstring = x
self.get = http.parse_qs(argstring, 1)
# cache the client and server information, we'll need this later to be
# serialized and sent with the request so CGIs will work remotely
self.client = self.channel.transport.getPeer()
self.host = self.channel.transport.getHost()
# Argument processing
ctype = self.requestHeaders.getRawHeaders(b'content-type')
if ctype is not None:
ctype = ctype[0]
# Process POST data if present
if self.method == b"POST" and ctype:
mfd = b'multipart/form-data'
key, pdict = http._parseHeader(ctype)
if key == b'application/x-www-form-urlencoded':
self.post.update(http.parse_qs(self.content.read(), 1))
elif key == mfd:
try:
cgiArgs = cgi.parse_multipart(self.content, pdict)
if _PY3:
# parse_multipart on Python 3 decodes the header bytes
# as iso-8859-1 and returns a str key -- we want bytes
# so encode it back
self.post.update({x.encode('iso-8859-1'): y
for x, y in cgiArgs.items()})
else:
self.post.update(cgiArgs)
except:
# It was a bad request.
http._respondToBadRequestAndDisconnect(self.channel.transport)
return
self.content.seek(0, 0)
# Continue with rest of request handling
self.process()
class HTTPProtocol(http.HTTPChannel):
requestFactory = WebRequest
class HTTPFactory(http.HTTPFactory):
"""
Factory which takes care of tracking which protocol
instances or request instances are responsible for which
named response channels, so incoming messages can be
routed appropriately.
"""
protocol = HTTPProtocol
def __init__(self, channel_layer):
http.HTTPFactory.__init__(self)
self.channel_layer = channel_layer
# We track all sub-protocols for response channel mapping
self.reply_protocols = {}
# Make a factory for WebSocket protocols
self.ws_factory = WebSocketFactory(self)
self.ws_factory.protocol = WebSocketProtocol
self.ws_factory.reply_protocols = self.reply_protocols
def reply_channels(self):
return self.reply_protocols.keys()
def dispatch_reply(self, channel, message):
if channel.startswith("!http") and isinstance(self.reply_protocols[channel], WebRequest):
self.reply_protocols[channel].serverResponse(message)
elif channel.startswith("!websocket") and isinstance(self.reply_protocols[channel], WebSocketProtocol):
if message.get("bytes", None):
self.reply_protocols[channel].serverSend(message["bytes"], True)
if message.get("text", None):
self.reply_protocols[channel].serverSend(message["text"], False)
if message.get("close", False):
self.reply_protocols[channel].serverClose()
else:
raise ValueError("Cannot dispatch message on channel %r" % channel)

52
daphne/server.py Executable file
View File

@ -0,0 +1,52 @@
import time
from twisted.internet import reactor
from .http_protocol import HTTPFactory
class Server(object):
def __init__(self, channel_layer, host="127.0.0.1", port=8000):
self.channel_layer = channel_layer
self.host = host
self.port = port
def run(self):
self.factory = HTTPFactory(self.channel_layer)
reactor.listenTCP(self.port, self.factory, interface=self.host)
reactor.callInThread(self.backend_reader)
reactor.callLater(1, self.keepalive_sender)
reactor.run()
def backend_reader(self):
"""
Run in a separate thread; reads messages from the backend.
"""
while True:
channels = self.factory.reply_channels()
# Quit if reactor is stopping
if not reactor.running:
return
# Don't do anything if there's no channels to listen on
if channels:
channel, message = self.channel_layer.receive_many(channels)
else:
time.sleep(0.1)
continue
# Wait around if there's nothing received
if channel is None:
time.sleep(0.05)
continue
# Deal with the message
self.factory.dispatch_reply(channel, message)
def keepalive_sender(self):
"""
Sends keepalive messages for open WebSockets every
(channel_backend expiry / 2) seconds.
"""
expiry_window = int(self.channel_layer.group_expiry / 2)
for protocol in self.factory.reply_protocols.values():
if time.time() - protocol.last_keepalive > expiry_window:
protocol.sendKeepalive()
reactor.callLater(1, self.keepalive_sender)

90
daphne/ws_protocol.py Executable file
View File

@ -0,0 +1,90 @@
import time
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
from django.http import parse_cookie
class WebSocketProtocol(WebSocketServerProtocol):
"""
Protocol which supports WebSockets and forwards incoming messages to
the websocket channels.
"""
def __init__(self, *args, **kwargs):
WebSocketServerProtocol.__init__(self, *args, **kwargs)
# Easy parent factory/channel layer link
self.main_factory = self.factory.main_factory
self.channel_layer = self.main_factory.channel_layer
def onConnect(self, request):
self.request_info = {
"path": request.path,
"get": request.params,
"cookies": parse_cookie(request.headers.get('cookie', ''))
}
def onOpen(self):
# Make sending channel
self.reply_channel = self.channel_layer.new_channel("!websocket.send.?")
self.request_info["reply_channel"] = self.reply_channel
self.last_keepalive = time.time()
# Tell main factory about it
self.main_factory.reply_protocols[self.reply_channel] = self
# Send news that this channel is open
self.channel_layer.send("websocket.connect", self.request_info)
def onMessage(self, payload, isBinary):
if isBinary:
self.channel_layer.send("websocket.receive", {
"reply_channel": self.reply_channel,
"bytes": payload,
})
else:
self.channel_layer.send("websocket.receive", {
"reply_channel": self.reply_channel,
"text": payload.decode("utf8"),
})
def serverSend(self, content, binary=False):
"""
Server-side channel message to send a message.
"""
if binary:
self.sendMessage(content, binary)
else:
self.sendMessage(content.encode("utf8"), binary)
def serverClose(self):
"""
Server-side channel message to close the socket
"""
self.sendClose()
def onClose(self, wasClean, code, reason):
if hasattr(self, "reply_channel"):
del self.factory.reply_protocols[self.reply_channel]
self.channel_layer.send("websocket.disconnect", {
"reply_channel": self.reply_channel,
})
def sendKeepalive(self):
"""
Sends a keepalive packet on the keepalive channel.
"""
self.channel_layer.send("websocket.keepalive", {
"reply_channel": self.reply_channel,
})
self.last_keepalive = time.time()
class WebSocketFactory(WebSocketServerFactory):
"""
Factory subclass that remembers what the "main"
factory is, so WebSocket protocols can access it
to get reply ID info.
"""
def __init__(self, main_factory, *args, **kwargs):
self.main_factory = main_factory
WebSocketServerFactory.__init__(self, *args, **kwargs)

26
setup.py Executable file
View File

@ -0,0 +1,26 @@
import os
import sys
from setuptools import find_packages, setup
from daphne import __version__
# We use the README as the long_description
readme_path = os.path.join(os.path.dirname(__file__), "README.rst")
setup(
name='daphne',
version=__version__,
url='http://www.djangoproject.com/',
author='Django Software Foundation',
author_email='foundation@djangoproject.com',
description='Django HTTP/WebSocket server',
long_description=open(readme_path).read(),
license='BSD',
zip_safe=False,
packages=find_packages(),
include_package_data=True,
entry_points={'console_scripts': [
'daphne = daphne.cli:CommandLineInterface.entrypoint',
]},
)