diff --git a/daphne/ws_protocol.py b/daphne/ws_protocol.py index 975b1a9..184515d 100755 --- a/daphne/ws_protocol.py +++ b/daphne/ws_protocol.py @@ -31,17 +31,21 @@ class WebSocketProtocol(WebSocketServerProtocol): self.server.protocol_connected(self) self.request = request self.protocol_to_accept = None + self.root_path = self.server.root_path self.socket_opened = time.time() self.last_ping = time.time() try: - # Sanitize and decode headers + # Sanitize and decode headers, potentially extracting root path self.clean_headers = [] for name, value in request.headers.items(): name = name.encode("ascii") # Prevent CVE-2015-0219 if b"_" in name: continue - self.clean_headers.append((name.lower(), value.encode("latin1"))) + if name.lower() == b"daphne-root-path": + self.root_path = unquote(value) + else: + self.clean_headers.append((name.lower(), value.encode("latin1"))) # Get client address if possible peer = self.transport.getPeer() host = self.transport.getHost() @@ -76,6 +80,7 @@ class WebSocketProtocol(WebSocketServerProtocol): "type": "websocket", "path": unquote(self.path.decode("ascii")), "raw_path": self.path, + "root_path": self.root_path, "headers": self.clean_headers, "query_string": self._raw_query_string, # Passed by HTTP protocol "client": self.client_addr, diff --git a/tests/test_websocket.py b/tests/test_websocket.py index e954486..851143c 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -192,6 +192,30 @@ class TestWebsocket(DaphneTestCase): self.assertEqual(scope["path"], "/foo/bar") self.assertEqual(scope["raw_path"], b"/foo%2Fbar") + @given(daphne_path=http_strategies.http_path()) + @settings(max_examples=5, deadline=2000) + def test_root_path(self, *, daphne_path): + """ + Tests root_path handling. + """ + headers = [("Daphne-Root-Path", parse.quote(daphne_path))] + with DaphneTestingInstance() as test_app: + test_app.add_send_messages([{"type": "websocket.accept"}]) + self.websocket_handshake( + test_app, + path="/", + headers=headers, + ) + # Validate the scope and messages we got + scope, _ = test_app.get_received() + + # Daphne-Root-Path is not included in the returned 'headers' section. + self.assertNotIn( + "daphne-root-path", (header[0].lower() for header in scope["headers"]) + ) + # And what we're looking for, root_path being set. + self.assertEqual(scope["root_path"], daphne_path) + def test_text_frames(self): """ Tests we can send and receive text frames.