Add ChannelLiveServerTestCase (#497)

Adds a new test case base class that fires up a live Daphne server and workers for the test case to use, like Django's LiveServerTestCase.
This commit is contained in:
Artem Malyshev 2017-04-11 12:17:47 +03:00 committed by Andrew Godwin
parent 7383280776
commit db1b3ba951
16 changed files with 291 additions and 15 deletions

View File

@ -1,2 +1,3 @@
from .base import TransactionChannelTestCase, ChannelTestCase, Client, apply_routes # NOQA isort:skip
from .http import HttpClient # NOQA isort:skip
from .liveserver import ChannelLiveServerTestCase # NOQA isort:skip

228
channels/test/liveserver.py Normal file
View File

@ -0,0 +1,228 @@
import multiprocessing
import django
from daphne.server import Server
from django.core.exceptions import ImproperlyConfigured
from django.db import connections
from django.db.utils import load_backend
from django.test.testcases import TransactionTestCase
from django.test.utils import modify_settings, override_settings
from twisted.internet import reactor
from .. import DEFAULT_CHANNEL_LAYER
from ..asgi import ChannelLayerManager
from ..staticfiles import StaticFilesConsumer
from ..worker import Worker, WorkerGroup
# NOTE: We use ChannelLayerManager to prevent layer instance sharing
# between forked process. Some layers implementations create
# connections inside the __init__ method. After forking child
# processes can lose the ability to use this connection and typically
# stuck on some network operation. To prevent this we use new
# ChannelLayerManager each time we want to initiate default layer.
# This gives us guaranty that new layer instance will be created and
# new connection will be established.
class ProcessSetup(multiprocessing.Process):
"""Common initialization steps for test subprocess."""
def common_setup(self):
self.setup_django()
self.setup_databases()
self.override_settings()
def setup_django(self):
if django.VERSION >= (1, 10):
django.setup(set_prefix=False)
else:
django.setup()
def setup_databases(self):
for alias, db in self.databases.items():
backend = load_backend(db['ENGINE'])
conn = backend.DatabaseWrapper(db, alias)
if django.VERSION >= (1, 9):
connections[alias].creation.set_as_test_mirror(
conn.settings_dict,
)
else:
test_db_name = conn.settings_dict['NAME']
connections[alias].settings_dict['NAME'] = test_db_name
def override_settings(self):
if self.overridden_settings:
overridden = override_settings(**self.overridden_settings)
overridden.enable()
if self.modified_settings:
modified = modify_settings(self.modified_settings)
modified.enable()
class WorkerProcess(ProcessSetup):
def __init__(self, is_ready, n_threads, overridden_settings,
modified_settings, databases):
self.is_ready = is_ready
self.n_threads = n_threads
self.overridden_settings = overridden_settings
self.modified_settings = modified_settings
self.databases = databases
super(WorkerProcess, self).__init__()
self.daemon = True
def run(self):
try:
self.common_setup()
channel_layers = ChannelLayerManager()
channel_layers[DEFAULT_CHANNEL_LAYER].router.check_default(
http_consumer=StaticFilesConsumer(),
)
if self.n_threads == 1:
self.worker = Worker(
channel_layer=channel_layers[DEFAULT_CHANNEL_LAYER],
signal_handlers=False,
)
else:
self.worker = WorkerGroup(
channel_layer=channel_layers[DEFAULT_CHANNEL_LAYER],
signal_handlers=False,
n_threads=self.n_threads,
)
self.worker.ready()
self.is_ready.set()
self.worker.run()
except Exception:
self.is_ready.set()
raise
class DaphneProcess(ProcessSetup):
def __init__(self, host, port_storage, is_ready, overridden_settings,
modified_settings, databases):
self.host = host
self.port_storage = port_storage
self.is_ready = is_ready
self.overridden_settings = overridden_settings
self.modified_settings = modified_settings
self.databases = databases
super(DaphneProcess, self).__init__()
self.daemon = True
def run(self):
try:
self.common_setup()
channel_layers = ChannelLayerManager()
self.server = Server(
channel_layer=channel_layers[DEFAULT_CHANNEL_LAYER],
endpoints=['tcp:interface=%s:port=0' % (self.host)],
signal_handlers=False,
)
reactor.callLater(0.5, self.resolve_port)
self.server.run()
except Exception:
self.is_ready.set()
raise
def resolve_port(self):
port = self.server.listeners[0].result.getHost().port
self.port_storage.value = port
self.is_ready.set()
class ChannelLiveServerTestCase(TransactionTestCase):
"""
Does basically the same as TransactionTestCase but also launches a
live Daphne server and Channels worker in a separate process, so
that the tests may use another test framework, such as Selenium,
instead of the built-in dummy client.
"""
host = 'localhost'
ProtocolServerProcess = DaphneProcess
WorkerProcess = WorkerProcess
worker_threads = 1
@property
def live_server_url(self):
return 'http://%s:%s' % (self.host, self._port_storage.value)
@property
def live_server_ws_url(self):
return 'ws://%s:%s' % (self.host, self._port_storage.value)
def _pre_setup(self):
for connection in connections.all():
if self._is_in_memory_db(connection):
raise ImproperlyConfigured(
'ChannelLiveServerTestCase can not be used with in memory databases'
)
channel_layers = ChannelLayerManager()
if len(channel_layers.configs) > 1:
raise ImproperlyConfigured(
'ChannelLiveServerTestCase does not support multiple CHANNEL_LAYERS at this time'
)
channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
if 'flush' in channel_layer.extensions:
channel_layer.flush()
super(ChannelLiveServerTestCase, self)._pre_setup()
server_ready = multiprocessing.Event()
self._port_storage = multiprocessing.Value('i')
self._server_process = self.ProtocolServerProcess(
self.host,
self._port_storage,
server_ready,
self._overridden_settings,
self._modified_settings,
connections.databases,
)
self._server_process.start()
server_ready.wait()
worker_ready = multiprocessing.Event()
self._worker_process = self.WorkerProcess(
worker_ready,
self.worker_threads,
self._overridden_settings,
self._modified_settings,
connections.databases,
)
self._worker_process.start()
worker_ready.wait()
def _post_teardown(self):
self._server_process.terminate()
self._server_process.join()
self._worker_process.terminate()
self._worker_process.join()
super(ChannelLiveServerTestCase, self)._post_teardown()
def _is_in_memory_db(self, connection):
"""Check if DatabaseWrapper holds in memory database."""
if connection.vendor == 'sqlite':
if django.VERSION >= (1, 11):
return connection.is_in_memory_db()
else:
return connection.is_in_memory_db(
connection.settings_dict['NAME'],
)

View File

@ -5,5 +5,4 @@ warnings.warn(
DeprecationWarning,
)
from channels.test.base import TransactionChannelTestCase, ChannelTestCase, Client, apply_routes # NOQA isort:skip
from channels.test.http import HttpClient # NOQA isort:skip
from channels.test import * # NOQA isort:skip

View File

@ -18,7 +18,6 @@ Major Changes
work but will trigger a deprecation warning, and ``channels.tests`` will be
removed completely in version 1.3.
Minor Changes & Bugfixes
------------------------

View File

@ -294,3 +294,34 @@ mocked.
You can pass an ``alias`` argument to ``get_next_message``, ``Client`` and ``Channel``
to use a different layer too.
Live Server Test Case
---------------------
You can use browser automation libraries like Selenium or Splinter to
check your application against real layer installation. Use
``ChannelLiveServerTestCase`` for your acceptance tests.
.. code:: python
from channels.test import ChannelLiveServerTestCase
from splinter import Browser
class IntegrationTest(ChannelLiveServerTestCase):
def test_browse_site_index(self):
with Browser() as browser:
browser.visit(self.live_server_url)
# the rest of your integration test...
In the test above Daphne and Channels worker processes were fired up.
These processes run your project against the test database and the
default channel layer you spacify in the settings. If channel layer
support ``flush`` extension, initial cleanup will be done. So do not
run this code against your production environment. When channels
infrastructure is ready default web browser will be also started. You
can open your website in the real browser which can execute JavaScript
and operate on WebSockets. ``live_server_ws_url`` property is also
provided if you decide to run messaging directly from Python.

View File

@ -10,6 +10,7 @@ if __name__ == "__main__":
os.environ['DJANGO_SETTINGS_MODULE'] = "tests.settings"
django.setup()
TestRunner = get_runner(settings)
tests = sys.argv[1:] or ["tests"]
test_runner = TestRunner()
failures = test_runner.run_tests(["tests"])
failures = test_runner.run_tests(tests)
sys.exit(bool(failures))

View File

@ -22,7 +22,7 @@ setup(
'mock ; python_version < "3.0"',
'flake8>=2.0,<3.0',
'isort',
]
],
},
classifiers=[
'Development Status :: 5 - Production/Stable',

View File

@ -2,5 +2,6 @@ from channels.sessions import enforce_ordering
def ws_message(message):
"Echoes messages back to the client"
"Echoes messages back to the client."
message.reply_channel.send({'text': message['text']})

View File

@ -2,4 +2,5 @@ from django.http import HttpResponse
def index(request):
return HttpResponse("OK")

View File

@ -1,7 +1,12 @@
from setuptools import setup
from setuptools import find_packages, setup
setup(
name='channels-benchmark',
packages=find_packages(),
py_modules=['benchmark'],
install_requires=['autobahn', 'Twisted'],
install_requires=[
'autobahn',
'Twisted',
'statistics ; python_version < "3.0"',
],
)

View File

@ -1 +1 @@
#Blank on purpose
# Blank on purpose

View File

@ -1,4 +1,5 @@
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
import os
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@ -23,5 +24,10 @@ DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
'TEST': {
'NAME': os.path.join(BASE_DIR, 'test_db.sqlite3'),
},
},
}
ALLOWED_HOSTS = ['*']

View File

@ -1,7 +1,6 @@
# Settings for channels specifically
from testproject.settings.base import *
INSTALLED_APPS += (
'channels',
)

View File

@ -1,14 +1,19 @@
# Settings for channels specifically
from testproject.settings.base import *
INSTALLED_APPS += ('channels',)
INSTALLED_APPS += (
'channels',
)
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'asgi_rabbitmq.RabbitmqChannelLayer',
'ROUTING': 'testproject.urls.channel_routing',
'CONFIG': {
'url': os.environ['RABBITMQ_URL'],
'url': os.environ.get(
'RABBITMQ_URL',
'amqp://guest:guest@localhost:5672/%2F',
),
},
},
}

View File

@ -24,7 +24,7 @@ CHANNEL_LAYERS = {
'fake_channel': {
'BACKEND': 'tests.test_management.FakeChannelLayer',
'ROUTING': [],
}
},
}
MIDDLEWARE_CLASSES = []

View File

@ -14,4 +14,4 @@ deps =
commands =
flake8: flake8
isort: isort --check-only --recursive channels
django: coverage run --parallel-mode {toxinidir}/runtests.py
django: coverage run --parallel-mode {toxinidir}/runtests.py {posargs}