From 49c00da715745ccefc539d38b737bcf1439eed54 Mon Sep 17 00:00:00 2001 From: Patrick Gingras <775.pg.12@gmail.com> Date: Mon, 2 Nov 2020 22:25:54 -0500 Subject: [PATCH] test for regression in handling asyncio.CancelledError --- daphne/testing.py | 30 +++++++++++++++++++-------- tests/test_websocket.py | 45 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/daphne/testing.py b/daphne/testing.py index 1632516..e2c7200 100644 --- a/daphne/testing.py +++ b/daphne/testing.py @@ -7,7 +7,7 @@ import traceback from concurrent.futures import CancelledError -class DaphneTestingInstance: +class BaseDaphneTestingInstance: """ Launches an instance of Daphne in a subprocess, with a host and port attribute allowing you to call it. @@ -17,17 +17,16 @@ class DaphneTestingInstance: startup_timeout = 2 - def __init__(self, xff=False, http_timeout=None, request_buffer_size=None): + def __init__( + self, xff=False, http_timeout=None, request_buffer_size=None, *, application + ): self.xff = xff self.http_timeout = http_timeout self.host = "127.0.0.1" - self.lock = multiprocessing.Lock() self.request_buffer_size = request_buffer_size + self.application = application def __enter__(self): - # Clear result storage - TestApplication.delete_setup() - TestApplication.delete_result() # Option Daphne features kwargs = {} if self.request_buffer_size: @@ -42,7 +41,7 @@ class DaphneTestingInstance: # Start up process self.process = DaphneProcess( host=self.host, - application=TestApplication(lock=self.lock), + application=self.application, kwargs=kwargs, setup=self.process_setup, teardown=self.process_teardown, @@ -76,6 +75,21 @@ class DaphneTestingInstance: """ pass + def get_received(self): + pass + + +class DaphneTestingInstance(BaseDaphneTestingInstance): + def __init__(self, *args, **kwargs): + self.lock = multiprocessing.Lock() + super().__init__(*args, **kwargs, application=TestApplication(lock=self.lock)) + + def __enter__(self): + # Clear result storage + TestApplication.delete_setup() + TestApplication.delete_result() + return super().__enter__() + def get_received(self): """ Returns the scope and messages the test application has received @@ -149,7 +163,7 @@ class DaphneProcess(multiprocessing.Process): self.server.run() finally: self.teardown() - except Exception as e: + except BaseException as e: # Put the error on our queue so the parent gets it self.errors.put((e, traceback.format_exc())) diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 862e71c..3230832 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -6,6 +6,7 @@ from urllib import parse import http_strategies from http_base import DaphneTestCase, DaphneTestingInstance +from daphne.testing import BaseDaphneTestingInstance from hypothesis import given, settings @@ -261,3 +262,47 @@ class TestWebsocket(DaphneTestCase): self.websocket_send_frame(sock, "still alive?") # Receive a frame and make sure it's correct assert self.websocket_receive_frame(sock) == "cake" + + def test_application_checker_handles_asyncio_cancellederror(self): + with CancellingTestingInstance() as app: + # Connect to the websocket app, it will immediately raise + # asyncio.CancelledError + sock, _ = self.websocket_handshake(app) + # Disconnect from the socket + sock.close() + # Wait for application_checker to clean up the applications for + # disconnected clients, and for the server to be stopped. + time.sleep(3) + # Make sure we received either no error, or a ConnectionsNotEmpty + while not app.process.errors.empty(): + err, _tb = app.process.errors.get() + if not isinstance(err, ConnectionsNotEmpty): + raise err + self.fail( + "Server connections were not cleaned up after an asyncio.CancelledError was raised" + ) + + +async def cancelling_application(scope, receive, send): + import asyncio + from twisted.internet import reactor + + reactor.callLater(2, lambda: reactor.stop()) + await send({"type": "websocket.accept"}) + raise asyncio.CancelledError() + + +class ConnectionsNotEmpty(Exception): + pass + + +class CancellingTestingInstance(BaseDaphneTestingInstance): + def __init__(self): + super().__init__(application=cancelling_application) + + def process_teardown(self): + import multiprocessing + + proc = multiprocessing.current_process() + if proc.server.connections: + raise ConnectionsNotEmpty()