Merge branch 'main' into master

This commit is contained in:
Albertas Gimbutas 2021-06-23 13:03:49 +03:00 committed by GitHub
commit a9a2cb1c09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 323 additions and 156 deletions

24
.github/workflows/pre-commit.yml vendored Normal file
View File

@ -0,0 +1,24 @@
name: pre-commit
on:
push:
branches:
- main
pull_request:
jobs:
pre-commit:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- uses: actions/setup-python@v2
with:
python-version: 3.9
- uses: pre-commit/action@v2.0.0
with:
token: ${{ secrets.GITHUB_TOKEN }}

35
.github/workflows/tests.yml vendored Normal file
View File

@ -0,0 +1,35 @@
name: Tests
on:
push:
branches:
- master
pull_request:
jobs:
tests:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version:
- 3.6
- 3.7
- 3.8
- 3.9
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
python -m pip install --upgrade tox tox-py
- name: Run tox targets for ${{ matrix.python-version }}
run: tox --py current

21
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,21 @@
repos:
- repo: https://github.com/asottile/pyupgrade
rev: v2.11.0
hooks:
- id: pyupgrade
args: [--py36-plus]
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
language_version: python3
- repo: https://github.com/pycqa/isort
rev: 5.8.0
hooks:
- id: isort
- repo: https://github.com/PyCQA/flake8
rev: 3.9.0
hooks:
- id: flake8
additional_dependencies:
- flake8-bugbear

View File

@ -1,48 +0,0 @@
sudo: false
language: python
python:
- '3.8'
- '3.7'
- '3.6'
- '3.5'
env:
- TWISTED="twisted"
- TWISTED="twisted==18.7.0"
install:
- pip install $TWISTED -e .[tests]
- pip freeze
script:
- pytest
stages:
- lint
- test
- name: release
if: tag IS present
jobs:
include:
- stage: lint
python: 3.6
install: pip install -U -e .[tests] black pyflakes isort
script:
- pyflakes daphne tests
- black --check daphne tests
- isort --check-only --diff --recursive daphne tests
- stage: release
python: 3.6
script: skip
deploy:
provider: pypi
user: andrewgodwin_bot
on:
tags: true
distributions: sdist bdist_wheel
password:
secure: IA+dvSmMKN+fT47rgRb6zdmrExhK5QCVEDH8kheC6kAacw80ORBZKo6sMX9GQBJ3BlfhTqrzAhItHkDUxonb579rJDvmlJ7FPg7axZpsY9Fmls6q1rJC/La8iGWx20+ctberejKSH3wSwa0LH0imJXGDoKKzf1DLmk5pEEWjG2QqhKdEtyAcnzOPnDWcRCs+DKfQcMzETH7lMFN8oe3aBhHLLtcg4yA78cN5CeyyH92lmbaVp7k/b1FqXXFgf16bi5tlgLrb6DhmcnNjwLMSHRafNoPCXkWQOwh6gEHeHRR3OsHsBueyJHIikuHNrpmgpAqjYlVQ5WqmfgMlhCfRm9xL+G4G+KK9n8AJNGAszUfxVlPvMTw+nkOSd/bmxKrdCqqYnDIvDLucXJ86TstNzklfAwr3FL+wBlucRtOMLhQlHIaPTXYcNpOuh6B4ELjC+WjDGh8EdRKvcsZz7+5AS5ZaDDccuviMzQFsXVcE2d4HiosbARVrkxJ7j3MWp0OGgWVxXgRO2EQIksbgGSIjI8PqFjBqht2WT6MhVZPCc9XHUlP2CiAR5+QY8JgTIztbEDuhpgr0cRAtiHwJEAxDR9tJR/j/v4X/Pau2ZdR0C0yW77lVgD75spLL0khAnU7q+qgiF0hyQ7gRRVy0tElT0HBenVbzjzHowdJX8lSPjRg=

View File

@ -1,3 +1,33 @@
3.0.2 (2021-04-07)
------------------
* Fixed a bug where ``send`` passed to applications wasn't a true async
function but a lambda wrapper, preventing it from being used with
``asgiref.sync.async_to_sync()``.
3.0.1 (2020-11-12)
------------------
* Fixed a bug where ``asyncio.CancelledError`` was not correctly handled on
Python 3.8+, resulting in incorrect protocol application cleanup.
3.0.0 (2020-10-28)
------------------
* Updates internals to use ASGI v3 throughout. ``asgiref.compatibility`` is
used for older applications.
* Consequently, the `--asgi-protocol` command-line option is removed.
* HTTP request bodies are now read, and passed to the application, in chunks.
* Added support for Python 3.9.
* Dropped support for Python 3.5.
2.5.0 (2020-04-15)
------------------
@ -9,6 +39,7 @@
``lock`` argument to ``__init__()``. This is expected to be an instance of
``multiprocessing.Lock``.
2.4.1 (2019-12-18)
------------------
@ -16,6 +47,7 @@
3.0's ``async_unsafe()`` decorator in threaded contexts, such as using the
auto-reloader.
2.4.0 (2019-11-20)
------------------
@ -33,11 +65,13 @@
* Adds missing LICENSE to distribution.
2.3.0 (2019-04-09)
------------------
* Added support for ASGI v3.
2.2.5 (2019-01-31)
------------------

View File

@ -1,24 +1,17 @@
daphne
======
.. image:: https://api.travis-ci.org/django/daphne.svg
:target: https://travis-ci.org/django/daphne
.. image:: https://img.shields.io/pypi/v/daphne.svg
:target: https://pypi.python.org/pypi/daphne
Daphne is a HTTP, HTTP2 and WebSocket protocol server for
`ASGI <https://github.com/django/asgiref/blob/master/specs/asgi.rst>`_ and
`ASGI-HTTP <https://github.com/django/asgiref/blob/master/specs/www.rst>`_,
`ASGI <https://github.com/django/asgiref/blob/main/specs/asgi.rst>`_ and
`ASGI-HTTP <https://github.com/django/asgiref/blob/main/specs/www.rst>`_,
developed to power Django Channels.
It supports automatic negotiation of protocols; there's no need for URL
prefixing to determine WebSocket endpoints versus HTTP endpoints.
*Note:* Daphne 2 is not compatible with Channels 1.x applications, only with
Channels 2.x and other ASGI applications. Install a 1.x version of Daphne
for Channels 1.x support.
Running
-------
@ -61,7 +54,7 @@ Daphne supports terminating HTTP/2 connections natively. You'll
need to do a couple of things to get it working, though. First, you need to
make sure you install the Twisted ``http2`` and ``tls`` extras::
pip install -U Twisted[tls,http2]
pip install -U 'Twisted[tls,http2]'
Next, because all current browsers only support HTTP/2 when using TLS, you will
need to start Daphne with TLS turned on, which can be done using the Twisted endpoint syntax::
@ -115,19 +108,19 @@ should start with a slash, but not end with one; for example::
Python Support
--------------
Daphne requires Python 3.5 or later.
Daphne requires Python 3.6 or later.
Contributing
------------
Please refer to the
`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
`main Channels contributing docs <https://github.com/django/channels/blob/main/CONTRIBUTING.rst>`_.
To run tests, make sure you have installed the ``tests`` extra with the package::
cd daphne/
pip install -e .[tests]
pip install -e '.[tests]'
pytest
@ -141,4 +134,4 @@ https://docs.djangoproject.com/en/dev/internals/security/.
To report bugs or request new features, please open a new GitHub issue.
This repository is part of the Channels project. For the shepherd and maintenance team, please see the
`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
`main Channels readme <https://github.com/django/channels/blob/main/README.rst>`_.

View File

@ -1,6 +1,6 @@
import sys
__version__ = "2.5.0"
__version__ = "3.0.2"
# Windows on Python 3.8+ uses ProactorEventLoop, which is not compatible with

3
daphne/__main__.py Normal file
View File

@ -0,0 +1,3 @@
from daphne.cli import CommandLineInterface
CommandLineInterface.entrypoint()

View File

@ -1,7 +1,7 @@
import datetime
class AccessLogGenerator(object):
class AccessLogGenerator:
"""
Object that implements the Daphne "action logger" internal interface in
order to provide an access log in something resembling NCSA format.

View File

@ -1,10 +1,9 @@
import argparse
import functools
import logging
import sys
from argparse import ArgumentError, Namespace
from asgiref.compatibility import is_double_callable
from asgiref.compatibility import guarantee_single_callable
from .access import AccessLogGenerator
from .endpoints import build_endpoint_description_strings
@ -17,20 +16,7 @@ DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 8000
class ASGI3Middleware:
def __init__(self, app):
self.app = app
def __call__(self, scope):
scope.setdefault("asgi", {})
scope["asgi"]["version"] = "3.0"
return functools.partial(self.asgi, scope=scope)
async def asgi(self, receive, send, scope):
await self.app(scope, receive, send)
class CommandLineInterface(object):
class CommandLineInterface:
"""
Acts as the main CLI entry point for running the server.
"""
@ -129,13 +115,6 @@ class CommandLineInterface(object):
help="The WebSocket protocols you wish to support",
default=None,
)
self.parser.add_argument(
"--asgi-protocol",
dest="asgi_protocol",
help="The version of the ASGI protocol to use",
default="auto",
choices=["asgi2", "asgi3", "auto"],
)
self.parser.add_argument(
"--root-path",
dest="root_path",
@ -247,16 +226,11 @@ class CommandLineInterface(object):
access_log_stream = open(args.access_log, "a", 1)
elif args.verbosity >= 1:
access_log_stream = sys.stdout
# Import application
sys.path.insert(0, ".")
application = import_by_path(args.application)
asgi_protocol = args.asgi_protocol
if asgi_protocol == "auto":
asgi_protocol = "asgi2" if is_double_callable(application) else "asgi3"
if asgi_protocol == "asgi3":
application = ASGI3Middleware(application)
application = guarantee_single_callable(application)
# Set up port/host bindings
if not any(
@ -284,7 +258,7 @@ class CommandLineInterface(object):
)
endpoints = sorted(args.socket_strings + endpoints)
# Start the server
logger.info("Starting server at %s" % (", ".join(endpoints),))
logger.info("Starting server at {}".format(", ".join(endpoints)))
self.server = self.server_class(
application=application,
endpoints=endpoints,

View File

@ -185,9 +185,19 @@ class WebRequest(http.Request):
# Not much we can do, the request is prematurely abandoned.
return
# Run application against request
self.application_queue.put_nowait(
{"type": "http.request", "body": self.content.read()}
)
buffer_size = self.server.request_buffer_size
while True:
chunk = self.content.read(buffer_size)
more_body = not (len(chunk) < buffer_size)
payload = {
"type": "http.request",
"body": chunk,
"more_body": more_body,
}
self.application_queue.put_nowait(payload)
if not more_body:
break
except Exception:
logger.error(traceback.format_exc())
self.basic_error(

View File

@ -22,6 +22,7 @@ else:
import logging
import time
from concurrent.futures import CancelledError
from functools import partial
from autobahn.websocket.compress import PERMESSAGE_COMPRESSION_EXTENSION as EXTENSIONS
from twisted.internet import defer, reactor
@ -35,7 +36,7 @@ from .ws_protocol import WebSocketFactory
logger = logging.getLogger(__name__)
class Server(object):
class Server:
def __init__(
self,
application,
@ -43,6 +44,7 @@ class Server(object):
signal_handlers=True,
action_logger=None,
http_timeout=None,
request_buffer_size=8192,
websocket_timeout=86400,
websocket_connect_timeout=20,
websocket_permessage_compression_extensions=[
@ -73,6 +75,7 @@ class Server(object):
self.http_timeout = http_timeout
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.request_buffer_size = request_buffer_size
self.proxy_forwarded_address_header = proxy_forwarded_address_header
self.proxy_forwarded_port_header = proxy_forwarded_port_header
self.proxy_forwarded_proto_header = proxy_forwarded_proto_header
@ -207,15 +210,17 @@ class Server(object):
assert "application_instance" not in self.connections[protocol]
# Make an instance of the application
input_queue = asyncio.Queue()
application_instance = self.application(scope=scope)
scope.setdefault("asgi", {"version": "3.0"})
application_instance = self.application(
scope=scope,
receive=input_queue.get,
send=partial(self.handle_reply, protocol),
)
# Run it, and stash the future for later checking
if protocol not in self.connections:
return None
self.connections[protocol]["application_instance"] = asyncio.ensure_future(
application_instance(
receive=input_queue.get,
send=lambda message: self.handle_reply(protocol, message),
),
application_instance,
loop=asyncio.get_event_loop(),
)
return input_queue
@ -298,7 +303,7 @@ class Server(object):
if application_instance and application_instance.done():
try:
exception = application_instance.exception()
except CancelledError:
except (CancelledError, asyncio.CancelledError):
# Future cancellation. We can ignore this.
pass
else:

View File

@ -5,10 +5,9 @@ import pickle
import tempfile
import traceback
from concurrent.futures import CancelledError
from functools import partial
class DaphneTestingInstance:
class BaseDaphneTestingInstance:
"""
Launches an instance of Daphne in a subprocess, with a host and port
attribute allowing you to call it.
@ -18,18 +17,20 @@ class DaphneTestingInstance:
startup_timeout = 2
def __init__(self, xff=False, http_timeout=None):
def __init__(
self, xff=False, http_timeout=None, request_buffer_size=None, *, application
):
self.xff = xff
self.http_timeout = http_timeout
self.host = "127.0.0.1"
self.lock = multiprocessing.Lock()
self.request_buffer_size = request_buffer_size
self.application = application
def __enter__(self):
# Clear result storage
TestApplication.delete_setup()
TestApplication.delete_result()
# Option Daphne features
kwargs = {}
if self.request_buffer_size:
kwargs["request_buffer_size"] = self.request_buffer_size
# Optionally enable X-Forwarded-For support.
if self.xff:
kwargs["proxy_forwarded_address_header"] = "X-Forwarded-For"
@ -40,7 +41,7 @@ class DaphneTestingInstance:
# Start up process
self.process = DaphneProcess(
host=self.host,
application=partial(TestApplication, lock=self.lock),
application=self.application,
kwargs=kwargs,
setup=self.process_setup,
teardown=self.process_teardown,
@ -74,6 +75,21 @@ class DaphneTestingInstance:
"""
pass
def get_received(self):
pass
class DaphneTestingInstance(BaseDaphneTestingInstance):
def __init__(self, *args, **kwargs):
self.lock = multiprocessing.Lock()
super().__init__(*args, **kwargs, application=TestApplication(lock=self.lock))
def __enter__(self):
# Clear result storage
TestApplication.delete_setup()
TestApplication.delete_result()
return super().__enter__()
def get_received(self):
"""
Returns the scope and messages the test application has received
@ -147,7 +163,7 @@ class DaphneProcess(multiprocessing.Process):
self.server.run()
finally:
self.teardown()
except Exception as e:
except BaseException as e:
# Put the error on our queue so the parent gets it
self.errors.put((e, traceback.format_exc()))
@ -170,12 +186,12 @@ class TestApplication:
setup_storage = os.path.join(tempfile.gettempdir(), "setup.testio")
result_storage = os.path.join(tempfile.gettempdir(), "result.testio")
def __init__(self, scope, lock):
self.scope = scope
def __init__(self, lock):
self.lock = lock
self.messages = []
async def __call__(self, send, receive):
async def __call__(self, scope, receive, send):
self.scope = scope
# Receive input and send output
logging.debug("test app coroutine alive")
try:

View File

@ -7,7 +7,7 @@ from zope.interface import implementer
@implementer(IPlugin, IStreamServerEndpointStringParser)
class _FDParser(object):
class _FDParser:
prefix = "fd"
def _parseServer(self, reactor, fileno, domain=socket.AF_INET):

View File

@ -304,7 +304,7 @@ class WebSocketProtocol(WebSocketServerProtocol):
return id(self) == id(other)
def __repr__(self):
return "<WebSocketProtocol client=%r path=%r>" % (self.client_addr, self.path)
return f"<WebSocketProtocol client={self.client_addr!r} path={self.path!r}>"
class WebSocketFactory(WebSocketServerFactory):
@ -325,7 +325,7 @@ class WebSocketFactory(WebSocketServerFactory):
Builds protocol instances. We use this to inject the factory object into the protocol.
"""
try:
protocol = super(WebSocketFactory, self).buildProtocol(addr)
protocol = super().buildProtocol(addr)
protocol.factory = self
return protocol
except Exception:

View File

@ -1,16 +1,10 @@
[bdist_wheel]
universal=1
[tool:pytest]
addopts = tests/
[isort]
include_trailing_comma = True
multi_line_output = 3
known_first_party = channels,daphne,asgiref,channels_redis
line_length = 88
[flake8]
exclude = venv/*,tox/*,docs/*,testproject/*,js_client/*,.eggs/*
ignore = E123,E128,E266,E402,W503,E731,W601
extend-ignore = E123, E128, E266, E402, W503, E731, W601
max-line-length = 120
[isort]
profile = black
[tool:pytest]
testpaths = tests

View File

@ -22,7 +22,8 @@ setup(
package_dir={"twisted": "daphne/twisted"},
packages=find_packages() + ["twisted.plugins"],
include_package_data=True,
install_requires=["twisted[tls]>=18.7", "autobahn>=0.18", "asgiref~=3.2"],
install_requires=["twisted[tls]>=18.7", "autobahn>=0.18", "asgiref>=3.2.10,<4"],
python_requires=">=3.6",
setup_requires=["pytest-runner"],
extras_require={
"tests": ["hypothesis==4.23", "pytest~=3.10", "pytest-asyncio~=0.8"]
@ -38,10 +39,10 @@ setup(
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Topic :: Internet :: WWW/HTTP",
],
)

View File

@ -20,13 +20,24 @@ class DaphneTestCase(unittest.TestCase):
### Plain HTTP helpers
def run_daphne_http(
self, method, path, params, body, responses, headers=None, timeout=1, xff=False
self,
method,
path,
params,
body,
responses,
headers=None,
timeout=1,
xff=False,
request_buffer_size=None,
):
"""
Runs Daphne with the given request callback (given the base URL)
and response messages.
"""
with DaphneTestingInstance(xff=xff) as test_app:
with DaphneTestingInstance(
xff=xff, request_buffer_size=request_buffer_size
) as test_app:
# Add the response messages
test_app.add_send_messages(responses)
# Send it the request. We have to do this the long way to allow
@ -79,7 +90,14 @@ class DaphneTestCase(unittest.TestCase):
)
def run_daphne_request(
self, method, path, params=None, body=None, headers=None, xff=False
self,
method,
path,
params=None,
body=None,
headers=None,
xff=False,
request_buffer_size=None,
):
"""
Convenience method for just testing request handling.
@ -92,6 +110,7 @@ class DaphneTestCase(unittest.TestCase):
body=body,
headers=headers,
xff=xff,
request_buffer_size=request_buffer_size,
responses=[
{"type": "http.response.start", "status": 200},
{"type": "http.response.body", "body": b"OK"},
@ -163,7 +182,7 @@ class DaphneTestCase(unittest.TestCase):
if response.status != 101:
raise RuntimeError("WebSocket upgrade did not result in status code 101")
# Prepare headers for subprotocol searching
response_headers = dict((n.lower(), v) for n, v in response.getheaders())
response_headers = {n.lower(): v for n, v in response.getheaders()}
response.read()
assert not response.closed
# Return the raw socket and any subprotocol
@ -233,7 +252,7 @@ class DaphneTestCase(unittest.TestCase):
"""
try:
socket.inet_aton(address)
except socket.error:
except OSError:
self.fail("'%s' is not a valid IP address." % address)
def assert_key_sets(self, required_keys, optional_keys, actual_keys):

View File

@ -1,5 +1,3 @@
# coding: utf8
import logging
from argparse import ArgumentError
from unittest import TestCase

View File

@ -1,11 +1,10 @@
# coding: utf8
import collections
from urllib import parse
import http_strategies
from http_base import DaphneTestCase
from hypothesis import assume, given, settings
from hypothesis.strategies import integers
class TestHTTPRequest(DaphneTestCase):
@ -23,6 +22,7 @@ class TestHTTPRequest(DaphneTestCase):
# Check overall keys
self.assert_key_sets(
required_keys={
"asgi",
"type",
"http_version",
"method",
@ -34,6 +34,7 @@ class TestHTTPRequest(DaphneTestCase):
optional_keys={"scheme", "root_path", "client", "server"},
actual_keys=scope.keys(),
)
self.assertEqual(scope["asgi"]["version"], "3.0")
# Check that it is the right type
self.assertEqual(scope["type"], "http")
# Method (uppercased unicode string)
@ -119,6 +120,26 @@ class TestHTTPRequest(DaphneTestCase):
self.assert_valid_http_scope(scope, "GET", request_path, params=request_params)
self.assert_valid_http_request_message(messages[0], body=b"")
@given(request_path=http_strategies.http_path(), chunk_size=integers(min_value=1))
@settings(max_examples=5, deadline=5000)
def test_request_body_chunking(self, request_path, chunk_size):
"""
Tests request body chunking logic.
"""
body = b"The quick brown fox jumps over the lazy dog"
_, messages = self.run_daphne_request(
"POST",
request_path,
body=body,
request_buffer_size=chunk_size,
)
# Avoid running those asserts when there's a single "http.disconnect"
if len(messages) > 1:
assert messages[0]["body"].decode() == body.decode()[:chunk_size]
assert not messages[-2]["more_body"]
assert messages[-1] == {"type": "http.disconnect"}
@given(
request_path=http_strategies.http_path(),
request_body=http_strategies.http_body(),

View File

@ -1,5 +1,3 @@
# coding: utf8
import http_strategies
from http_base import DaphneTestCase
from hypothesis import given, settings

View File

@ -1,5 +1,3 @@
# coding: utf8
from unittest import TestCase
from twisted.web.http_headers import Headers

View File

@ -1,5 +1,3 @@
# coding: utf8
import collections
import time
from urllib import parse
@ -8,6 +6,8 @@ import http_strategies
from http_base import DaphneTestCase, DaphneTestingInstance
from hypothesis import given, settings
from daphne.testing import BaseDaphneTestingInstance
class TestWebsocket(DaphneTestCase):
"""
@ -23,10 +23,18 @@ class TestWebsocket(DaphneTestCase):
"""
# Check overall keys
self.assert_key_sets(
required_keys={"type", "path", "raw_path", "query_string", "headers"},
required_keys={
"asgi",
"type",
"path",
"raw_path",
"query_string",
"headers",
},
optional_keys={"scheme", "root_path", "client", "server", "subprotocols"},
actual_keys=scope.keys(),
)
self.assertEqual(scope["asgi"]["version"], "3.0")
# Check that it is the right type
self.assertEqual(scope["type"], "websocket")
# Path
@ -307,3 +315,54 @@ class TestWebsocket(DaphneTestCase):
self.websocket_send_frame(sock, "still alive?")
# Receive a frame and make sure it's correct
assert self.websocket_receive_frame(sock) == "cake"
def test_application_checker_handles_asyncio_cancellederror(self):
with CancellingTestingInstance() as app:
# Connect to the websocket app, it will immediately raise
# asyncio.CancelledError
sock, _ = self.websocket_handshake(app)
# Disconnect from the socket
sock.close()
# Wait for application_checker to clean up the applications for
# disconnected clients, and for the server to be stopped.
time.sleep(3)
# Make sure we received either no error, or a ConnectionsNotEmpty
while not app.process.errors.empty():
err, _tb = app.process.errors.get()
if not isinstance(err, ConnectionsNotEmpty):
raise err
self.fail(
"Server connections were not cleaned up after an asyncio.CancelledError was raised"
)
async def cancelling_application(scope, receive, send):
import asyncio
from twisted.internet import reactor
# Stop the server after a short delay so that the teardown is run.
reactor.callLater(2, lambda: reactor.stop())
await send({"type": "websocket.accept"})
raise asyncio.CancelledError()
class ConnectionsNotEmpty(Exception):
pass
class CancellingTestingInstance(BaseDaphneTestingInstance):
def __init__(self):
super().__init__(application=cancelling_application)
def process_teardown(self):
import multiprocessing
# Get a hold of the enclosing DaphneProcess (we're currently running in
# the same process as the application).
proc = multiprocessing.current_process()
# By now the (only) socket should have disconnected, and the
# application_checker should have run. If there are any connections
# still, it means that the application_checker did not clean them up.
if proc.server.connections:
raise ConnectionsNotEmpty()

12
tox.ini Normal file
View File

@ -0,0 +1,12 @@
[tox]
envlist =
py{36,37,38,39}-twisted{187,latest}
[testenv]
usedevelop = true
extras = tests
commands =
pytest -v {posargs}
deps =
twisted187: twisted==18.7.0
twistedlatest: twisted>=20.3.0