From 88d47df2769e715b0ccc1fdcd5a2ae6c082b9d9d Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 23 Dec 2015 17:05:15 +0000 Subject: [PATCH] Initial commit --- .gitignore | 1 + README.rst | 4 + daphne/__init__.py | 1 + daphne/cli.py | 87 +++++++++++++++++ daphne/http_protocol.py | 205 ++++++++++++++++++++++++++++++++++++++++ daphne/server.py | 52 ++++++++++ daphne/ws_protocol.py | 90 ++++++++++++++++++ setup.py | 26 +++++ 8 files changed, 466 insertions(+) create mode 100644 .gitignore create mode 100644 README.rst create mode 100755 daphne/__init__.py create mode 100755 daphne/cli.py create mode 100755 daphne/http_protocol.py create mode 100755 daphne/server.py create mode 100755 daphne/ws_protocol.py create mode 100755 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..11041c7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.egg-info diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..8082920 --- /dev/null +++ b/README.rst @@ -0,0 +1,4 @@ +daphne +====== + +Daphne is a HTTP, HTTP2 and WebSocket interface server for Django. diff --git a/daphne/__init__.py b/daphne/__init__.py new file mode 100755 index 0000000..1ea903c --- /dev/null +++ b/daphne/__init__.py @@ -0,0 +1 @@ +__version__ = "0.7" diff --git a/daphne/cli.py b/daphne/cli.py new file mode 100755 index 0000000..9d368c2 --- /dev/null +++ b/daphne/cli.py @@ -0,0 +1,87 @@ +import sys +import argparse +import logging +import importlib +from .server import Server + + +logger = logging.getLogger(__name__) + + +class CommandLineInterface(object): + """ + Acts as the main CLI entry point for running the server. + """ + + description = "Django HTTP/WebSocket server" + + def __init__(self): + self.parser = argparse.ArgumentParser( + description=self.description, + ) + self.parser.add_argument( + '-p', + '--port', + type=int, + help='Port number to listen on', + default=8000, + ) + self.parser.add_argument( + '-b', + '--bind', + dest='host', + help='The host/address to bind to', + default="127.0.0.1", + ) + self.parser.add_argument( + '-v', + '--verbosity', + type=int, + help='How verbose to make the output', + default=1, + ) + self.parser.add_argument( + 'channel_layer', + help='The ASGI channel layer instance to use as path.to.module:instance.path', + ) + + @classmethod + def entrypoint(cls): + """ + Main entrypoint for external starts. + """ + cls().run(sys.argv[1:]) + + def run(self, args): + """ + Pass in raw argument list and it will decode them + and run the server. + """ + # Decode args + args = self.parser.parse_args(args) + # Set up logging + logging.basicConfig( + level = { + 0: logging.WARN, + 1: logging.INFO, + 2: logging.DEBUG, + }[args.verbosity], + format = "%(asctime)-15s %(levelname)-8s %(message)s" , + ) + # Import channel layer + module_path, object_path = args.channel_layer.split(":", 1) + channel_layer = importlib.import_module(module_path) + for bit in object_path.split("."): + channel_layer = getattr(channel_layer, bit) + # Run server + logger.info( + "Starting server on %s:%s, channel layer %s", + args.host, + args.port, + args.channel_layer, + ) + Server( + channel_layer=channel_layer, + host=args.host, + port=args.port, + ).run() diff --git a/daphne/http_protocol.py b/daphne/http_protocol.py new file mode 100755 index 0000000..be3cf26 --- /dev/null +++ b/daphne/http_protocol.py @@ -0,0 +1,205 @@ +from __future__ import unicode_literals + +import time +import logging +from twisted.python.compat import _PY3 +from twisted.web import http +from twisted.protocols.policies import ProtocolWrapper + +from .ws_protocol import WebSocketProtocol, WebSocketFactory + +logger = logging.getLogger(__name__) + + +class WebRequest(http.Request): + """ + Request that either hands off information to channels, or offloads + to a WebSocket class. + + Does some extra processing over the normal Twisted Web request to separate + GET and POST out. + """ + + def __init__(self, *args, **kwargs): + http.Request.__init__(self, *args, **kwargs) + # Easy factory link + self.factory = self.channel.factory + # Make a name for our reply channel + self.reply_channel = self.factory.channel_layer.new_channel(b"!http.response.?") + # Tell factory we're that channel's client + self.last_keepalive = time.time() + self.factory.reply_protocols[self.reply_channel] = self + + def process(self): + # Get upgrade header + upgrade_header = None + if self.requestHeaders.hasHeader("Upgrade"): + upgrade_header = self.requestHeaders.getRawHeaders("Upgrade")[0] + # Is it WebSocket? IS IT?! + if upgrade_header == "websocket": + # Make WebSocket protocol to hand off to + protocol = self.factory.ws_factory.buildProtocol(self.transport.getPeer()) + if not protocol: + # If protocol creation fails, we signal "internal server error" + self.setResponseCode(500) + self.finish() + # Port across transport + transport, self.transport = self.transport, None + if isinstance(transport, ProtocolWrapper): + # i.e. TLS is a wrapping protocol + transport.wrappedProtocol = protocol + else: + transport.protocol = protocol + protocol.makeConnection(transport) + # Re-inject request + data = self.method + b' ' + self.uri + b' HTTP/1.1\x0d\x0a' + for h in self.requestHeaders.getAllRawHeaders(): + data += h[0] + b': ' + b",".join(h[1]) + b'\x0d\x0a' + data += b"\x0d\x0a" + data += self.content.read() + protocol.dataReceived(data) + # Remove our HTTP reply channel association + logging.debug("Upgraded connection %s to WebSocket", self.reply_channel) + self.factory.reply_protocols[self.reply_channel] = None + self.reply_channel = None + # Boring old HTTP. + else: + # Send request message + logging.debug("HTTP %s request for %s", self.method, self.reply_channel) + self.factory.channel_layer.send(b"http.request", { + "reply_channel": self.reply_channel, + "method": self.method, + "get": self.get, + "post": self.post, + "cookies": self.received_cookies, + "headers": {k: v[0] for k, v in self.requestHeaders.getAllRawHeaders()}, + "client": [self.client.host, self.client.port], + "server": [self.host.host, self.host.port], + "path": self.path, + }) + + def connectionLost(self, reason): + """ + Cleans up reply channel on close. + """ + if self.reply_channel: + del self.channel.factory.reply_protocols[self.reply_channel] + logging.debug("HTTP disconnect for %s", self.reply_channel) + http.Request.connectionLost(self, reason) + + def serverResponse(self, message): + """ + Writes a received HTTP response back out to the transport. + """ + # Write code + self.setResponseCode(message['status']) + # Write headers + for header, value in message.get("headers", {}): + self.setHeader(header.encode("utf8"), value.encode("utf8")) + # Write cookies + for cookie in message.get("cookies"): + self.cookies.append(cookie.encode("utf8")) + # Write out body + if "content" in message: + http.Request.write(self, message['content'].encode("utf8")) + self.finish() + logging.debug("HTTP %s response for %s", message['status'], self.reply_channel) + + def requestReceived(self, command, path, version): + """ + Called by channel when all data has been received. + Overridden because Twisted merges GET and POST into one thing by default. + """ + self.content.seek(0,0) + self.get = {} + self.post = {} + + self.method, self.uri = command, path + self.clientproto = version + x = self.uri.split(b'?', 1) + + # URI and GET args assignment + if len(x) == 1: + self.path = self.uri + else: + self.path, argstring = x + self.get = http.parse_qs(argstring, 1) + + # cache the client and server information, we'll need this later to be + # serialized and sent with the request so CGIs will work remotely + self.client = self.channel.transport.getPeer() + self.host = self.channel.transport.getHost() + + # Argument processing + ctype = self.requestHeaders.getRawHeaders(b'content-type') + if ctype is not None: + ctype = ctype[0] + + # Process POST data if present + if self.method == b"POST" and ctype: + mfd = b'multipart/form-data' + key, pdict = http._parseHeader(ctype) + if key == b'application/x-www-form-urlencoded': + self.post.update(http.parse_qs(self.content.read(), 1)) + elif key == mfd: + try: + cgiArgs = cgi.parse_multipart(self.content, pdict) + + if _PY3: + # parse_multipart on Python 3 decodes the header bytes + # as iso-8859-1 and returns a str key -- we want bytes + # so encode it back + self.post.update({x.encode('iso-8859-1'): y + for x, y in cgiArgs.items()}) + else: + self.post.update(cgiArgs) + except: + # It was a bad request. + http._respondToBadRequestAndDisconnect(self.channel.transport) + return + self.content.seek(0, 0) + + # Continue with rest of request handling + self.process() + + +class HTTPProtocol(http.HTTPChannel): + + requestFactory = WebRequest + + +class HTTPFactory(http.HTTPFactory): + """ + Factory which takes care of tracking which protocol + instances or request instances are responsible for which + named response channels, so incoming messages can be + routed appropriately. + """ + + protocol = HTTPProtocol + + def __init__(self, channel_layer): + http.HTTPFactory.__init__(self) + self.channel_layer = channel_layer + # We track all sub-protocols for response channel mapping + self.reply_protocols = {} + # Make a factory for WebSocket protocols + self.ws_factory = WebSocketFactory(self) + self.ws_factory.protocol = WebSocketProtocol + self.ws_factory.reply_protocols = self.reply_protocols + + def reply_channels(self): + return self.reply_protocols.keys() + + def dispatch_reply(self, channel, message): + if channel.startswith("!http") and isinstance(self.reply_protocols[channel], WebRequest): + self.reply_protocols[channel].serverResponse(message) + elif channel.startswith("!websocket") and isinstance(self.reply_protocols[channel], WebSocketProtocol): + if message.get("bytes", None): + self.reply_protocols[channel].serverSend(message["bytes"], True) + if message.get("text", None): + self.reply_protocols[channel].serverSend(message["text"], False) + if message.get("close", False): + self.reply_protocols[channel].serverClose() + else: + raise ValueError("Cannot dispatch message on channel %r" % channel) diff --git a/daphne/server.py b/daphne/server.py new file mode 100755 index 0000000..e491dd8 --- /dev/null +++ b/daphne/server.py @@ -0,0 +1,52 @@ +import time +from twisted.internet import reactor + +from .http_protocol import HTTPFactory + + +class Server(object): + + def __init__(self, channel_layer, host="127.0.0.1", port=8000): + self.channel_layer = channel_layer + self.host = host + self.port = port + + def run(self): + self.factory = HTTPFactory(self.channel_layer) + reactor.listenTCP(self.port, self.factory, interface=self.host) + reactor.callInThread(self.backend_reader) + reactor.callLater(1, self.keepalive_sender) + reactor.run() + + def backend_reader(self): + """ + Run in a separate thread; reads messages from the backend. + """ + while True: + channels = self.factory.reply_channels() + # Quit if reactor is stopping + if not reactor.running: + return + # Don't do anything if there's no channels to listen on + if channels: + channel, message = self.channel_layer.receive_many(channels) + else: + time.sleep(0.1) + continue + # Wait around if there's nothing received + if channel is None: + time.sleep(0.05) + continue + # Deal with the message + self.factory.dispatch_reply(channel, message) + + def keepalive_sender(self): + """ + Sends keepalive messages for open WebSockets every + (channel_backend expiry / 2) seconds. + """ + expiry_window = int(self.channel_layer.group_expiry / 2) + for protocol in self.factory.reply_protocols.values(): + if time.time() - protocol.last_keepalive > expiry_window: + protocol.sendKeepalive() + reactor.callLater(1, self.keepalive_sender) diff --git a/daphne/ws_protocol.py b/daphne/ws_protocol.py new file mode 100755 index 0000000..0b6f9cb --- /dev/null +++ b/daphne/ws_protocol.py @@ -0,0 +1,90 @@ +import time + +from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory + +from django.http import parse_cookie + + +class WebSocketProtocol(WebSocketServerProtocol): + """ + Protocol which supports WebSockets and forwards incoming messages to + the websocket channels. + """ + + def __init__(self, *args, **kwargs): + WebSocketServerProtocol.__init__(self, *args, **kwargs) + # Easy parent factory/channel layer link + self.main_factory = self.factory.main_factory + self.channel_layer = self.main_factory.channel_layer + + def onConnect(self, request): + self.request_info = { + "path": request.path, + "get": request.params, + "cookies": parse_cookie(request.headers.get('cookie', '')) + } + + def onOpen(self): + # Make sending channel + self.reply_channel = self.channel_layer.new_channel("!websocket.send.?") + self.request_info["reply_channel"] = self.reply_channel + self.last_keepalive = time.time() + # Tell main factory about it + self.main_factory.reply_protocols[self.reply_channel] = self + # Send news that this channel is open + self.channel_layer.send("websocket.connect", self.request_info) + + def onMessage(self, payload, isBinary): + if isBinary: + self.channel_layer.send("websocket.receive", { + "reply_channel": self.reply_channel, + "bytes": payload, + }) + else: + self.channel_layer.send("websocket.receive", { + "reply_channel": self.reply_channel, + "text": payload.decode("utf8"), + }) + + def serverSend(self, content, binary=False): + """ + Server-side channel message to send a message. + """ + if binary: + self.sendMessage(content, binary) + else: + self.sendMessage(content.encode("utf8"), binary) + + def serverClose(self): + """ + Server-side channel message to close the socket + """ + self.sendClose() + + def onClose(self, wasClean, code, reason): + if hasattr(self, "reply_channel"): + del self.factory.reply_protocols[self.reply_channel] + self.channel_layer.send("websocket.disconnect", { + "reply_channel": self.reply_channel, + }) + + def sendKeepalive(self): + """ + Sends a keepalive packet on the keepalive channel. + """ + self.channel_layer.send("websocket.keepalive", { + "reply_channel": self.reply_channel, + }) + self.last_keepalive = time.time() + + +class WebSocketFactory(WebSocketServerFactory): + """ + Factory subclass that remembers what the "main" + factory is, so WebSocket protocols can access it + to get reply ID info. + """ + + def __init__(self, main_factory, *args, **kwargs): + self.main_factory = main_factory + WebSocketServerFactory.__init__(self, *args, **kwargs) diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..43b1eaf --- /dev/null +++ b/setup.py @@ -0,0 +1,26 @@ +import os +import sys +from setuptools import find_packages, setup +from daphne import __version__ + + +# We use the README as the long_description +readme_path = os.path.join(os.path.dirname(__file__), "README.rst") + + +setup( + name='daphne', + version=__version__, + url='http://www.djangoproject.com/', + author='Django Software Foundation', + author_email='foundation@djangoproject.com', + description='Django HTTP/WebSocket server', + long_description=open(readme_path).read(), + license='BSD', + zip_safe=False, + packages=find_packages(), + include_package_data=True, + entry_points={'console_scripts': [ + 'daphne = daphne.cli:CommandLineInterface.entrypoint', + ]}, +)