test for regression in handling asyncio.CancelledError

This commit is contained in:
Patrick Gingras 2020-11-02 22:25:54 -05:00
parent 7225fa7245
commit 49c00da715
No known key found for this signature in database
GPG Key ID: F56B4E7C84CA0B49
2 changed files with 67 additions and 8 deletions

View File

@ -7,7 +7,7 @@ import traceback
from concurrent.futures import CancelledError
class DaphneTestingInstance:
class BaseDaphneTestingInstance:
"""
Launches an instance of Daphne in a subprocess, with a host and port
attribute allowing you to call it.
@ -17,17 +17,16 @@ class DaphneTestingInstance:
startup_timeout = 2
def __init__(self, xff=False, http_timeout=None, request_buffer_size=None):
def __init__(
self, xff=False, http_timeout=None, request_buffer_size=None, *, application
):
self.xff = xff
self.http_timeout = http_timeout
self.host = "127.0.0.1"
self.lock = multiprocessing.Lock()
self.request_buffer_size = request_buffer_size
self.application = application
def __enter__(self):
# Clear result storage
TestApplication.delete_setup()
TestApplication.delete_result()
# Option Daphne features
kwargs = {}
if self.request_buffer_size:
@ -42,7 +41,7 @@ class DaphneTestingInstance:
# Start up process
self.process = DaphneProcess(
host=self.host,
application=TestApplication(lock=self.lock),
application=self.application,
kwargs=kwargs,
setup=self.process_setup,
teardown=self.process_teardown,
@ -76,6 +75,21 @@ class DaphneTestingInstance:
"""
pass
def get_received(self):
pass
class DaphneTestingInstance(BaseDaphneTestingInstance):
def __init__(self, *args, **kwargs):
self.lock = multiprocessing.Lock()
super().__init__(*args, **kwargs, application=TestApplication(lock=self.lock))
def __enter__(self):
# Clear result storage
TestApplication.delete_setup()
TestApplication.delete_result()
return super().__enter__()
def get_received(self):
"""
Returns the scope and messages the test application has received
@ -149,7 +163,7 @@ class DaphneProcess(multiprocessing.Process):
self.server.run()
finally:
self.teardown()
except Exception as e:
except BaseException as e:
# Put the error on our queue so the parent gets it
self.errors.put((e, traceback.format_exc()))

View File

@ -6,6 +6,7 @@ from urllib import parse
import http_strategies
from http_base import DaphneTestCase, DaphneTestingInstance
from daphne.testing import BaseDaphneTestingInstance
from hypothesis import given, settings
@ -261,3 +262,47 @@ class TestWebsocket(DaphneTestCase):
self.websocket_send_frame(sock, "still alive?")
# Receive a frame and make sure it's correct
assert self.websocket_receive_frame(sock) == "cake"
def test_application_checker_handles_asyncio_cancellederror(self):
with CancellingTestingInstance() as app:
# Connect to the websocket app, it will immediately raise
# asyncio.CancelledError
sock, _ = self.websocket_handshake(app)
# Disconnect from the socket
sock.close()
# Wait for application_checker to clean up the applications for
# disconnected clients, and for the server to be stopped.
time.sleep(3)
# Make sure we received either no error, or a ConnectionsNotEmpty
while not app.process.errors.empty():
err, _tb = app.process.errors.get()
if not isinstance(err, ConnectionsNotEmpty):
raise err
self.fail(
"Server connections were not cleaned up after an asyncio.CancelledError was raised"
)
async def cancelling_application(scope, receive, send):
import asyncio
from twisted.internet import reactor
reactor.callLater(2, lambda: reactor.stop())
await send({"type": "websocket.accept"})
raise asyncio.CancelledError()
class ConnectionsNotEmpty(Exception):
pass
class CancellingTestingInstance(BaseDaphneTestingInstance):
def __init__(self):
super().__init__(application=cancelling_application)
def process_teardown(self):
import multiprocessing
proc = multiprocessing.current_process()
if proc.server.connections:
raise ConnectionsNotEmpty()