mirror of
https://github.com/django/daphne.git
synced 2024-11-21 15:36:33 +03:00
Unify all strings to double quotes
This commit is contained in:
parent
22aa56e196
commit
0626f39214
|
@ -17,34 +17,34 @@ class AccessLogGenerator(object):
|
|||
# HTTP requests
|
||||
if protocol == "http" and action == "complete":
|
||||
self.write_entry(
|
||||
host=details['client'],
|
||||
host=details["client"],
|
||||
date=datetime.datetime.now(),
|
||||
request="%(method)s %(path)s" % details,
|
||||
status=details['status'],
|
||||
length=details['size'],
|
||||
status=details["status"],
|
||||
length=details["size"],
|
||||
)
|
||||
# Websocket requests
|
||||
elif protocol == "websocket" and action == "connecting":
|
||||
self.write_entry(
|
||||
host=details['client'],
|
||||
host=details["client"],
|
||||
date=datetime.datetime.now(),
|
||||
request="WSCONNECTING %(path)s" % details,
|
||||
)
|
||||
elif protocol == "websocket" and action == "rejected":
|
||||
self.write_entry(
|
||||
host=details['client'],
|
||||
host=details["client"],
|
||||
date=datetime.datetime.now(),
|
||||
request="WSREJECT %(path)s" % details,
|
||||
)
|
||||
elif protocol == "websocket" and action == "connected":
|
||||
self.write_entry(
|
||||
host=details['client'],
|
||||
host=details["client"],
|
||||
date=datetime.datetime.now(),
|
||||
request="WSCONNECT %(path)s" % details,
|
||||
)
|
||||
elif protocol == "websocket" and action == "disconnected":
|
||||
self.write_entry(
|
||||
host=details['client'],
|
||||
host=details["client"],
|
||||
date=datetime.datetime.now(),
|
||||
request="WSDISCONNECT %(path)s" % details,
|
||||
)
|
||||
|
|
108
daphne/cli.py
108
daphne/cli.py
|
@ -10,7 +10,7 @@ from .utils import import_by_path
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_HOST = '127.0.0.1'
|
||||
DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 8000
|
||||
|
||||
class CommandLineInterface(object):
|
||||
|
@ -27,108 +27,108 @@ class CommandLineInterface(object):
|
|||
description=self.description,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'-p',
|
||||
'--port',
|
||||
"-p",
|
||||
"--port",
|
||||
type=int,
|
||||
help='Port number to listen on',
|
||||
help="Port number to listen on",
|
||||
default=None,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'-b',
|
||||
'--bind',
|
||||
dest='host',
|
||||
help='The host/address to bind to',
|
||||
"-b",
|
||||
"--bind",
|
||||
dest="host",
|
||||
help="The host/address to bind to",
|
||||
default=None,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--websocket_timeout',
|
||||
"--websocket_timeout",
|
||||
type=int,
|
||||
help='Maximum time to allow a websocket to be connected. -1 for infinite.',
|
||||
help="Maximum time to allow a websocket to be connected. -1 for infinite.",
|
||||
default=86400,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--websocket_connect_timeout',
|
||||
"--websocket_connect_timeout",
|
||||
type=int,
|
||||
help='Maximum time to allow a connection to handshake. -1 for infinite',
|
||||
help="Maximum time to allow a connection to handshake. -1 for infinite",
|
||||
default=5,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'-u',
|
||||
'--unix-socket',
|
||||
dest='unix_socket',
|
||||
help='Bind to a UNIX socket rather than a TCP host/port',
|
||||
"-u",
|
||||
"--unix-socket",
|
||||
dest="unix_socket",
|
||||
help="Bind to a UNIX socket rather than a TCP host/port",
|
||||
default=None,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--fd',
|
||||
"--fd",
|
||||
type=int,
|
||||
dest='file_descriptor',
|
||||
help='Bind to a file descriptor rather than a TCP host/port or named unix socket',
|
||||
dest="file_descriptor",
|
||||
help="Bind to a file descriptor rather than a TCP host/port or named unix socket",
|
||||
default=None,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'-e',
|
||||
'--endpoint',
|
||||
dest='socket_strings',
|
||||
action='append',
|
||||
help='Use raw server strings passed directly to twisted',
|
||||
"-e",
|
||||
"--endpoint",
|
||||
dest="socket_strings",
|
||||
action="append",
|
||||
help="Use raw server strings passed directly to twisted",
|
||||
default=[],
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'-v',
|
||||
'--verbosity',
|
||||
"-v",
|
||||
"--verbosity",
|
||||
type=int,
|
||||
help='How verbose to make the output',
|
||||
help="How verbose to make the output",
|
||||
default=1,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'-t',
|
||||
'--http-timeout',
|
||||
"-t",
|
||||
"--http-timeout",
|
||||
type=int,
|
||||
help='How long to wait for worker before timing out HTTP connections',
|
||||
help="How long to wait for worker before timing out HTTP connections",
|
||||
default=120,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--access-log',
|
||||
help='Where to write the access log (- for stdout, the default for verbosity=1)',
|
||||
"--access-log",
|
||||
help="Where to write the access log (- for stdout, the default for verbosity=1)",
|
||||
default=None,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--ping-interval',
|
||||
"--ping-interval",
|
||||
type=int,
|
||||
help='The number of seconds a WebSocket must be idle before a keepalive ping is sent',
|
||||
help="The number of seconds a WebSocket must be idle before a keepalive ping is sent",
|
||||
default=20,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--ping-timeout',
|
||||
"--ping-timeout",
|
||||
type=int,
|
||||
help='The number of seconds before a WebSocket is closed if no response to a keepalive ping',
|
||||
help="The number of seconds before a WebSocket is closed if no response to a keepalive ping",
|
||||
default=30,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--ws-protocol',
|
||||
nargs='*',
|
||||
dest='ws_protocols',
|
||||
help='The WebSocket protocols you wish to support',
|
||||
"--ws-protocol",
|
||||
nargs="*",
|
||||
dest="ws_protocols",
|
||||
help="The WebSocket protocols you wish to support",
|
||||
default=None,
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--root-path',
|
||||
dest='root_path',
|
||||
help='The setting for the ASGI root_path variable',
|
||||
"--root-path",
|
||||
dest="root_path",
|
||||
help="The setting for the ASGI root_path variable",
|
||||
default="",
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'--proxy-headers',
|
||||
dest='proxy_headers',
|
||||
help='Enable parsing and using of X-Forwarded-For and X-Forwarded-Port headers and using that as the '
|
||||
'client address',
|
||||
"--proxy-headers",
|
||||
dest="proxy_headers",
|
||||
help="Enable parsing and using of X-Forwarded-For and X-Forwarded-Port headers and using that as the "
|
||||
"client address",
|
||||
default=False,
|
||||
action='store_true',
|
||||
action="store_true",
|
||||
)
|
||||
self.parser.add_argument(
|
||||
'application',
|
||||
help='The application to dispatch to as path.to.module:instance.path',
|
||||
"application",
|
||||
help="The application to dispatch to as path.to.module:instance.path",
|
||||
)
|
||||
|
||||
self.server = None
|
||||
|
@ -190,8 +190,8 @@ class CommandLineInterface(object):
|
|||
)
|
||||
# Start the server
|
||||
logger.info(
|
||||
'Starting server at %s' %
|
||||
(', '.join(endpoints), )
|
||||
"Starting server at %s" %
|
||||
(", ".join(endpoints), )
|
||||
)
|
||||
self.server = self.server_class(
|
||||
application=application,
|
||||
|
@ -205,7 +205,7 @@ class CommandLineInterface(object):
|
|||
ws_protocols=args.ws_protocols,
|
||||
root_path=args.root_path,
|
||||
verbosity=args.verbosity,
|
||||
proxy_forwarded_address_header='X-Forwarded-For' if args.proxy_headers else None,
|
||||
proxy_forwarded_port_header='X-Forwarded-Port' if args.proxy_headers else None,
|
||||
proxy_forwarded_address_header="X-Forwarded-For" if args.proxy_headers else None,
|
||||
proxy_forwarded_port_header="X-Forwarded-Port" if args.proxy_headers else None,
|
||||
)
|
||||
self.server.run()
|
||||
|
|
|
@ -11,15 +11,15 @@ def build_endpoint_description_strings(
|
|||
"""
|
||||
socket_descriptions = []
|
||||
if host and port:
|
||||
host = host.strip('[]').replace(':', '\:')
|
||||
socket_descriptions.append('tcp:port=%d:interface=%s' % (int(port), host))
|
||||
host = host.strip("[]").replace(":", "\:")
|
||||
socket_descriptions.append("tcp:port=%d:interface=%s" % (int(port), host))
|
||||
elif any([host, port]):
|
||||
raise ValueError('TCP binding requires both port and host kwargs.')
|
||||
raise ValueError("TCP binding requires both port and host kwargs.")
|
||||
|
||||
if unix_socket:
|
||||
socket_descriptions.append('unix:%s' % unix_socket)
|
||||
socket_descriptions.append("unix:%s" % unix_socket)
|
||||
|
||||
if file_descriptor is not None:
|
||||
socket_descriptions.append('fd:fileno=%d' % int(file_descriptor))
|
||||
socket_descriptions.append("fd:fileno=%d" % int(file_descriptor))
|
||||
|
||||
return socket_descriptions
|
||||
|
|
|
@ -118,9 +118,9 @@ class WebRequest(http.Request):
|
|||
transport.protocol = protocol
|
||||
protocol.makeConnection(transport)
|
||||
# Re-inject request
|
||||
data = self.method + b' ' + self.uri + b' HTTP/1.1\x0d\x0a'
|
||||
data = self.method + b" " + self.uri + b" HTTP/1.1\x0d\x0a"
|
||||
for h in self.requestHeaders.getAllRawHeaders():
|
||||
data += h[0] + b': ' + b",".join(h[1]) + b'\x0d\x0a'
|
||||
data += h[0] + b": " + b",".join(h[1]) + b"\x0d\x0a"
|
||||
data += b"\x0d\x0a"
|
||||
data += self.content.read()
|
||||
protocol.dataReceived(data)
|
||||
|
@ -203,29 +203,29 @@ class WebRequest(http.Request):
|
|||
"""
|
||||
if "type" not in message:
|
||||
raise ValueError("Message has no type defined")
|
||||
if message['type'] == "http.response":
|
||||
if message["type"] == "http.response":
|
||||
if self._response_started:
|
||||
raise ValueError("HTTP response has already been started")
|
||||
self._response_started = True
|
||||
if 'status' not in message:
|
||||
if "status" not in message:
|
||||
raise ValueError("Specifying a status code is required for a Response message.")
|
||||
# Set HTTP status code
|
||||
self.setResponseCode(message['status'])
|
||||
self.setResponseCode(message["status"])
|
||||
# Write headers
|
||||
for header, value in message.get("headers", {}):
|
||||
# Shim code from old ASGI version, can be removed after a while
|
||||
if isinstance(header, six.text_type):
|
||||
header = header.encode("latin1")
|
||||
self.responseHeaders.addRawHeader(header, value)
|
||||
logger.debug("HTTP %s response started for %s", message['status'], self.client_addr)
|
||||
elif message['type'] == "http.response.hunk":
|
||||
logger.debug("HTTP %s response started for %s", message["status"], self.client_addr)
|
||||
elif message["type"] == "http.response.hunk":
|
||||
if not self._response_started:
|
||||
raise ValueError("HTTP response has not yet been started but got %s" % message['type'])
|
||||
raise ValueError("HTTP response has not yet been started but got %s" % message["type"])
|
||||
else:
|
||||
raise ValueError("Cannot handle message type %s!" % message['type'])
|
||||
raise ValueError("Cannot handle message type %s!" % message["type"])
|
||||
|
||||
# Write out body
|
||||
http.Request.write(self, message.get('content', b''))
|
||||
http.Request.write(self, message.get("content", b""))
|
||||
|
||||
# End if there's no more content
|
||||
if not message.get("more_content", False):
|
||||
|
@ -356,9 +356,9 @@ class HTTPFactory(http.HTTPFactory):
|
|||
using ALPN, so that doesn't go here: anyone wanting websockets will
|
||||
negotiate HTTP/1.1 and then do the upgrade dance.
|
||||
"""
|
||||
baseProtocols = [b'http/1.1']
|
||||
baseProtocols = [b"http/1.1"]
|
||||
|
||||
if http.H2_ENABLED:
|
||||
baseProtocols.insert(0, b'h2')
|
||||
baseProtocols.insert(0, b"h2")
|
||||
|
||||
return baseProtocols
|
||||
|
|
|
@ -68,7 +68,7 @@ class Server(object):
|
|||
self.application_instances = {}
|
||||
# Make the factory
|
||||
self.http_factory = HTTPFactory(self)
|
||||
self.ws_factory = WebSocketFactory(self, protocols=self.websocket_protocols, server='Daphne')
|
||||
self.ws_factory = WebSocketFactory(self, protocols=self.websocket_protocols, server="Daphne")
|
||||
self.ws_factory.setProtocolOptions(
|
||||
autoPingTimeout=self.ping_timeout,
|
||||
allowNullOrigin=True,
|
||||
|
|
|
@ -14,7 +14,7 @@ def message_for_request(method, path, params=None, headers=None, body=None):
|
|||
that through daphne and returns the emitted channel message.
|
||||
"""
|
||||
request = _build_request(method, path, params, headers, body)
|
||||
message, factory, transport = _run_through_daphne(request, 'http.request')
|
||||
message, factory, transport = _run_through_daphne(request, "http.request")
|
||||
return message
|
||||
|
||||
|
||||
|
@ -27,9 +27,9 @@ def response_for_message(message):
|
|||
message_for_request) because we need a valid reply channel. I'm sure
|
||||
this can be streamlined, but it works for now.
|
||||
"""
|
||||
request = _build_request('GET', '/')
|
||||
request_message, factory, transport = _run_through_daphne(request, 'http.request')
|
||||
factory.dispatch_reply(request_message['reply_channel'], message)
|
||||
request = _build_request("GET", "/")
|
||||
request_message, factory, transport = _run_through_daphne(request, "http.request")
|
||||
factory.dispatch_reply(request_message["reply_channel"], message)
|
||||
return transport.value()
|
||||
|
||||
|
||||
|
@ -65,43 +65,43 @@ def _build_request(method, path, params=None, headers=None, body=None):
|
|||
if six.PY3:
|
||||
quoted_path = parse.quote(path)
|
||||
if params:
|
||||
quoted_path += '?' + parse.urlencode(params)
|
||||
quoted_path = quoted_path.encode('ascii')
|
||||
quoted_path += "?" + parse.urlencode(params)
|
||||
quoted_path = quoted_path.encode("ascii")
|
||||
else:
|
||||
quoted_path = parse.quote(path.encode('utf8'))
|
||||
quoted_path = parse.quote(path.encode("utf8"))
|
||||
if params:
|
||||
quoted_path += b'?' + parse.urlencode(params)
|
||||
quoted_path += b"?" + parse.urlencode(params)
|
||||
|
||||
request = method.encode('ascii') + b' ' + quoted_path + b" HTTP/1.1\r\n"
|
||||
request = method.encode("ascii") + b" " + quoted_path + b" HTTP/1.1\r\n"
|
||||
for name, value in headers:
|
||||
request += header_line(name, value)
|
||||
|
||||
request += b'\r\n'
|
||||
request += b"\r\n"
|
||||
|
||||
if body:
|
||||
request += body.encode('ascii')
|
||||
request += body.encode("ascii")
|
||||
|
||||
return request
|
||||
|
||||
|
||||
def build_websocket_upgrade(path, params, headers):
|
||||
ws_headers = [
|
||||
('Host', 'somewhere.com'),
|
||||
('Upgrade', 'websocket'),
|
||||
('Connection', 'Upgrade'),
|
||||
('Sec-WebSocket-Key', 'x3JJHMbDL1EzLkh9GBhXDw=='),
|
||||
('Sec-WebSocket-Protocol', 'chat, superchat'),
|
||||
('Sec-WebSocket-Version', '13'),
|
||||
('Origin', 'http://example.com')
|
||||
("Host", "somewhere.com"),
|
||||
("Upgrade", "websocket"),
|
||||
("Connection", "Upgrade"),
|
||||
("Sec-WebSocket-Key", "x3JJHMbDL1EzLkh9GBhXDw=="),
|
||||
("Sec-WebSocket-Protocol", "chat, superchat"),
|
||||
("Sec-WebSocket-Version", "13"),
|
||||
("Origin", "http://example.com")
|
||||
]
|
||||
return _build_request('GET', path, params, headers=headers + ws_headers, body=None)
|
||||
return _build_request("GET", path, params, headers=headers + ws_headers, body=None)
|
||||
|
||||
|
||||
def header_line(name, value):
|
||||
"""
|
||||
Given a header name and value, returns the line to use in a HTTP request or response.
|
||||
"""
|
||||
return name.encode('ascii') + b': ' + value.encode('ascii') + b"\r\n"
|
||||
return name.encode("ascii") + b": " + value.encode("ascii") + b"\r\n"
|
||||
|
||||
|
||||
def _run_through_daphne(request, channel_name):
|
||||
|
@ -113,7 +113,7 @@ def _run_through_daphne(request, channel_name):
|
|||
"""
|
||||
channel_layer = ChannelLayer()
|
||||
factory = HTTPFactory(channel_layer, send_channel="test!")
|
||||
proto = factory.buildProtocol(('127.0.0.1', 0))
|
||||
proto = factory.buildProtocol(("127.0.0.1", 0))
|
||||
transport = proto_helpers.StringTransport()
|
||||
proto.makeConnection(transport)
|
||||
proto.dataReceived(request)
|
||||
|
@ -125,4 +125,4 @@ def content_length_header(body):
|
|||
"""
|
||||
Returns an appropriate Content-Length HTTP header for a given body.
|
||||
"""
|
||||
return 'Content-Length', six.text_type(len(body))
|
||||
return "Content-Length", six.text_type(len(body))
|
||||
|
|
|
@ -7,10 +7,10 @@ import string
|
|||
|
||||
from hypothesis import strategies
|
||||
|
||||
HTTP_METHODS = ['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT']
|
||||
HTTP_METHODS = ["OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"]
|
||||
|
||||
# Unicode characters of the "Letter" category
|
||||
letters = strategies.characters(whitelist_categories=('Lu', 'Ll', 'Lt', 'Lm', 'Lo', 'Nl'))
|
||||
letters = strategies.characters(whitelist_categories=("Lu", "Ll", "Lt", "Lm", "Lo", "Nl"))
|
||||
|
||||
|
||||
def http_method():
|
||||
|
@ -18,7 +18,7 @@ def http_method():
|
|||
|
||||
|
||||
def _http_path_portion():
|
||||
alphabet = string.ascii_letters + string.digits + '-._~'
|
||||
alphabet = string.ascii_letters + string.digits + "-._~"
|
||||
return strategies.text(min_size=1, average_size=10, max_size=128, alphabet=alphabet)
|
||||
|
||||
|
||||
|
@ -27,7 +27,7 @@ def http_path():
|
|||
Returns a URL path (not encoded).
|
||||
"""
|
||||
return strategies.lists(
|
||||
_http_path_portion(), min_size=0, max_size=10).map(lambda s: '/' + '/'.join(s))
|
||||
_http_path_portion(), min_size=0, max_size=10).map(lambda s: "/" + "/".join(s))
|
||||
|
||||
|
||||
def http_body():
|
||||
|
@ -50,7 +50,7 @@ def valid_bidi(value):
|
|||
direction of text point of view. This little helper just rejects those.
|
||||
"""
|
||||
try:
|
||||
value.encode('idna')
|
||||
value.encode("idna")
|
||||
except UnicodeError:
|
||||
return False
|
||||
else:
|
||||
|
@ -67,12 +67,12 @@ def international_domain_name():
|
|||
Returns a byte string of a domain name, IDNA-encoded.
|
||||
"""
|
||||
return strategies.lists(
|
||||
_domain_label(), min_size=2, average_size=2).map(lambda s: ('.'.join(s)).encode('idna'))
|
||||
_domain_label(), min_size=2, average_size=2).map(lambda s: (".".join(s)).encode("idna"))
|
||||
|
||||
|
||||
def _query_param():
|
||||
return strategies.text(alphabet=letters, min_size=1, average_size=10, max_size=255).\
|
||||
map(lambda s: s.encode('utf8'))
|
||||
map(lambda s: s.encode("utf8"))
|
||||
|
||||
|
||||
def query_params():
|
||||
|
@ -94,7 +94,7 @@ def header_name():
|
|||
and 20 characters long
|
||||
"""
|
||||
return strategies.text(
|
||||
alphabet=string.ascii_letters + string.digits + '-', min_size=1, max_size=30)
|
||||
alphabet=string.ascii_letters + string.digits + "-", min_size=1, max_size=30)
|
||||
|
||||
|
||||
def header_value():
|
||||
|
@ -105,8 +105,8 @@ def header_value():
|
|||
https://en.wikipedia.org/wiki/List_of_HTTP_header_fields
|
||||
"""
|
||||
return strategies.text(
|
||||
alphabet=string.ascii_letters + string.digits + string.punctuation + ' /t',
|
||||
min_size=1, average_size=40, max_size=8190).filter(lambda s: len(s.encode('utf8')) < 8190)
|
||||
alphabet=string.ascii_letters + string.digits + string.punctuation + " /t",
|
||||
min_size=1, average_size=40, max_size=8190).filter(lambda s: len(s.encode("utf8")) < 8190)
|
||||
|
||||
|
||||
def headers():
|
||||
|
|
|
@ -28,7 +28,7 @@ class TestHTTPRequestSpec(testcases.ASGIHTTPTestCase):
|
|||
"""
|
||||
Smallest viable example. Mostly verifies that our request building works.
|
||||
"""
|
||||
request_method, request_path = 'GET', '/'
|
||||
request_method, request_path = "GET", "/"
|
||||
message = message_for_request(request_method, request_path)
|
||||
|
||||
self.assert_valid_http_request_message(message, request_method, request_path)
|
||||
|
@ -41,7 +41,7 @@ class TestHTTPRequestSpec(testcases.ASGIHTTPTestCase):
|
|||
"""
|
||||
Tests a typical HTTP GET request, with a path and query parameters
|
||||
"""
|
||||
request_method = 'GET'
|
||||
request_method = "GET"
|
||||
message = message_for_request(request_method, request_path, request_params)
|
||||
|
||||
self.assert_valid_http_request_message(
|
||||
|
@ -55,7 +55,7 @@ class TestHTTPRequestSpec(testcases.ASGIHTTPTestCase):
|
|||
"""
|
||||
Tests a typical POST request, submitting some data in a body.
|
||||
"""
|
||||
request_method = 'POST'
|
||||
request_method = "POST"
|
||||
headers = [content_length_header(request_body)]
|
||||
message = message_for_request(
|
||||
request_method, request_path, headers=headers, body=request_body)
|
||||
|
@ -69,7 +69,7 @@ class TestHTTPRequestSpec(testcases.ASGIHTTPTestCase):
|
|||
"""
|
||||
Tests that HTTP header fields are handled as specified
|
||||
"""
|
||||
request_method, request_path = 'OPTIONS', '/te st-à/'
|
||||
request_method, request_path = "OPTIONS", "/te st-à/"
|
||||
message = message_for_request(request_method, request_path, headers=request_headers)
|
||||
|
||||
self.assert_valid_http_request_message(
|
||||
|
@ -85,7 +85,7 @@ class TestHTTPRequestSpec(testcases.ASGIHTTPTestCase):
|
|||
header_name = request_headers[0][0]
|
||||
duplicated_headers = [(header_name, header[1]) for header in request_headers]
|
||||
|
||||
request_method, request_path = 'OPTIONS', '/te st-à/'
|
||||
request_method, request_path = "OPTIONS", "/te st-à/"
|
||||
message = message_for_request(request_method, request_path, headers=duplicated_headers)
|
||||
|
||||
self.assert_valid_http_request_message(
|
||||
|
@ -114,24 +114,24 @@ class TestHTTPRequestSpec(testcases.ASGIHTTPTestCase):
|
|||
message, request_method, request_path, request_params, request_headers, request_body)
|
||||
|
||||
def test_headers_are_lowercased_and_stripped(self):
|
||||
request_method, request_path = 'GET', '/'
|
||||
headers = [('MYCUSTOMHEADER', ' foobar ')]
|
||||
request_method, request_path = "GET", "/"
|
||||
headers = [("MYCUSTOMHEADER", " foobar ")]
|
||||
message = message_for_request(request_method, request_path, headers=headers)
|
||||
|
||||
self.assert_valid_http_request_message(
|
||||
message, request_method, request_path, request_headers=headers)
|
||||
# Note that Daphne returns a list of tuples here, which is fine, because the spec
|
||||
# asks to treat them interchangeably.
|
||||
assert message['headers'] == [(b'mycustomheader', b'foobar')]
|
||||
assert message["headers"] == [(b"mycustomheader", b"foobar")]
|
||||
|
||||
@given(daphne_path=http_strategies.http_path())
|
||||
def test_root_path_header(self, daphne_path):
|
||||
"""
|
||||
Tests root_path handling.
|
||||
"""
|
||||
request_method, request_path = 'GET', '/'
|
||||
request_method, request_path = "GET", "/"
|
||||
# Daphne-Root-Path must be URL encoded when submitting as HTTP header field
|
||||
headers = [('Daphne-Root-Path', parse.quote(daphne_path.encode('utf8')))]
|
||||
headers = [("Daphne-Root-Path", parse.quote(daphne_path.encode("utf8")))]
|
||||
message = message_for_request(request_method, request_path, headers=headers)
|
||||
|
||||
# Daphne-Root-Path is not included in the returned 'headers' section. So we expect
|
||||
|
@ -140,7 +140,7 @@ class TestHTTPRequestSpec(testcases.ASGIHTTPTestCase):
|
|||
self.assert_valid_http_request_message(
|
||||
message, request_method, request_path, request_headers=expected_headers)
|
||||
# And what we're looking for, root_path being set.
|
||||
assert message['root_path'] == daphne_path
|
||||
assert message["root_path"] == daphne_path
|
||||
|
||||
|
||||
class TestProxyHandling(unittest.TestCase):
|
||||
|
@ -153,7 +153,7 @@ class TestProxyHandling(unittest.TestCase):
|
|||
def setUp(self):
|
||||
self.channel_layer = ChannelLayer()
|
||||
self.factory = HTTPFactory(self.channel_layer, send_channel="test!")
|
||||
self.proto = self.factory.buildProtocol(('127.0.0.1', 0))
|
||||
self.proto = self.factory.buildProtocol(("127.0.0.1", 0))
|
||||
self.tr = proto_helpers.StringTransport()
|
||||
self.proto.makeConnection(self.tr)
|
||||
|
||||
|
@ -167,11 +167,11 @@ class TestProxyHandling(unittest.TestCase):
|
|||
)
|
||||
# Get the resulting message off of the channel layer
|
||||
_, message = self.channel_layer.receive(["http.request"])
|
||||
self.assertEqual(message['client'], ['192.168.1.1', 54321])
|
||||
self.assertEqual(message["client"], ["192.168.1.1", 54321])
|
||||
|
||||
def test_x_forwarded_for_parsed(self):
|
||||
self.factory.proxy_forwarded_address_header = 'X-Forwarded-For'
|
||||
self.factory.proxy_forwarded_port_header = 'X-Forwarded-Port'
|
||||
self.factory.proxy_forwarded_address_header = "X-Forwarded-For"
|
||||
self.factory.proxy_forwarded_port_header = "X-Forwarded-Port"
|
||||
self.proto.dataReceived(
|
||||
b"GET /te%20st-%C3%A0/?foo=+bar HTTP/1.1\r\n" +
|
||||
b"Host: somewhere.com\r\n" +
|
||||
|
@ -181,11 +181,11 @@ class TestProxyHandling(unittest.TestCase):
|
|||
)
|
||||
# Get the resulting message off of the channel layer
|
||||
_, message = self.channel_layer.receive(["http.request"])
|
||||
self.assertEqual(message['client'], ['10.1.2.3', 80])
|
||||
self.assertEqual(message["client"], ["10.1.2.3", 80])
|
||||
|
||||
def test_x_forwarded_for_port_missing(self):
|
||||
self.factory.proxy_forwarded_address_header = 'X-Forwarded-For'
|
||||
self.factory.proxy_forwarded_port_header = 'X-Forwarded-Port'
|
||||
self.factory.proxy_forwarded_address_header = "X-Forwarded-For"
|
||||
self.factory.proxy_forwarded_port_header = "X-Forwarded-Port"
|
||||
self.proto.dataReceived(
|
||||
b"GET /te%20st-%C3%A0/?foo=+bar HTTP/1.1\r\n" +
|
||||
b"Host: somewhere.com\r\n" +
|
||||
|
@ -194,4 +194,4 @@ class TestProxyHandling(unittest.TestCase):
|
|||
)
|
||||
# Get the resulting message off of the channel layer
|
||||
_, message = self.channel_layer.receive(["http.request"])
|
||||
self.assertEqual(message['client'], ['10.1.2.3', 0])
|
||||
self.assertEqual(message["client"], ["10.1.2.3", 0])
|
||||
|
|
|
@ -20,15 +20,15 @@ class TestHTTPResponseSpec(testcases.ASGIHTTPTestCase):
|
|||
"""
|
||||
Smallest viable example. Mostly verifies that our response building works.
|
||||
"""
|
||||
message = {'status': 200}
|
||||
message = {"status": 200}
|
||||
response = factories.response_for_message(message)
|
||||
self.assert_valid_http_response_message(message, response)
|
||||
self.assertIn(b'200 OK', response)
|
||||
self.assertIn(b"200 OK", response)
|
||||
# Assert that the response is the last of the chunks.
|
||||
# N.b. at the time of writing, Daphne did not support multiple response chunks,
|
||||
# but still sends with Transfer-Encoding: chunked if no Content-Length header
|
||||
# is specified (and maybe even if specified).
|
||||
self.assertTrue(response.endswith(b'0\r\n\r\n'))
|
||||
self.assertTrue(response.endswith(b"0\r\n\r\n"))
|
||||
|
||||
def test_status_code_required(self):
|
||||
"""
|
||||
|
@ -48,21 +48,21 @@ class TestHTTPResponseSpec(testcases.ASGIHTTPTestCase):
|
|||
of them have meaning that is respected by Twisted. E.g. setting 204 (No Content)
|
||||
as a status code results in Twisted discarding the body.
|
||||
"""
|
||||
message = {'status': 201} # 'Created'
|
||||
message = {"status": 201} # 'Created'
|
||||
response = factories.response_for_message(message)
|
||||
self.assert_valid_http_response_message(message, response)
|
||||
self.assertIn(b'201 Created', response)
|
||||
self.assertIn(b"201 Created", response)
|
||||
|
||||
@given(body=http_strategies.http_body())
|
||||
def test_body_is_transmitted(self, body):
|
||||
message = {'status': 200, 'content': body.encode('ascii')}
|
||||
message = {"status": 200, "content": body.encode("ascii")}
|
||||
response = factories.response_for_message(message)
|
||||
self.assert_valid_http_response_message(message, response)
|
||||
|
||||
@given(headers=http_strategies.headers())
|
||||
def test_headers(self, headers):
|
||||
# The ASGI spec requires us to lowercase our header names
|
||||
message = {'status': 200, 'headers': [(name.lower(), value) for name, value in headers]}
|
||||
message = {"status": 200, "headers": [(name.lower(), value) for name, value in headers]}
|
||||
response = factories.response_for_message(message)
|
||||
# The assert_ method does the heavy lifting of checking that headers are
|
||||
# as expected.
|
||||
|
@ -80,9 +80,9 @@ class TestHTTPResponseSpec(testcases.ASGIHTTPTestCase):
|
|||
so there's not a lot going on here.
|
||||
"""
|
||||
message = {
|
||||
'status': 202, # 'Accepted'
|
||||
'headers': [(name.lower(), value) for name, value in headers],
|
||||
'content': body.encode('ascii')
|
||||
"status": 202, # 'Accepted'
|
||||
"headers": [(name.lower(), value) for name, value in headers],
|
||||
"content": body.encode("ascii")
|
||||
}
|
||||
response = factories.response_for_message(message)
|
||||
self.assert_valid_http_response_message(message, response)
|
||||
|
@ -96,7 +96,7 @@ class TestHTTPResponse(TestCase):
|
|||
def setUp(self):
|
||||
self.channel_layer = ChannelLayer()
|
||||
self.factory = HTTPFactory(self.channel_layer, send_channel="test!")
|
||||
self.proto = self.factory.buildProtocol(('127.0.0.1', 0))
|
||||
self.proto = self.factory.buildProtocol(("127.0.0.1", 0))
|
||||
self.tr = proto_helpers.StringTransport()
|
||||
self.proto.makeConnection(self.tr)
|
||||
|
||||
|
@ -115,7 +115,7 @@ class TestHTTPResponse(TestCase):
|
|||
|
||||
# Send back an example response
|
||||
self.factory.dispatch_reply(
|
||||
message['reply_channel'],
|
||||
message["reply_channel"],
|
||||
{
|
||||
"status": 200,
|
||||
"status_text": b"OK",
|
||||
|
@ -125,4 +125,4 @@ class TestHTTPResponse(TestCase):
|
|||
|
||||
# Get the disconnection notification
|
||||
_, disconnect_message = self.channel_layer.receive(["http.disconnect"])
|
||||
self.assertEqual(disconnect_message['path'], "/te st-à/")
|
||||
self.assertEqual(disconnect_message["path"], "/te st-à/")
|
||||
|
|
|
@ -15,45 +15,45 @@ class TestXForwardedForHttpParsing(TestCase):
|
|||
|
||||
def test_basic(self):
|
||||
headers = Headers({
|
||||
b'X-Forwarded-For': [b'10.1.2.3'],
|
||||
b'X-Forwarded-Port': [b'1234']
|
||||
b"X-Forwarded-For": [b"10.1.2.3"],
|
||||
b"X-Forwarded-Port": [b"1234"]
|
||||
})
|
||||
result = parse_x_forwarded_for(headers)
|
||||
self.assertEqual(result, ['10.1.2.3', 1234])
|
||||
self.assertEqual(result, ["10.1.2.3", 1234])
|
||||
self.assertIsInstance(result[0], six.text_type)
|
||||
|
||||
def test_address_only(self):
|
||||
headers = Headers({
|
||||
b'X-Forwarded-For': [b'10.1.2.3'],
|
||||
b"X-Forwarded-For": [b"10.1.2.3"],
|
||||
})
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers),
|
||||
['10.1.2.3', 0]
|
||||
["10.1.2.3", 0]
|
||||
)
|
||||
|
||||
def test_v6_address(self):
|
||||
headers = Headers({
|
||||
b'X-Forwarded-For': [b'1043::a321:0001, 10.0.5.6'],
|
||||
b"X-Forwarded-For": [b"1043::a321:0001, 10.0.5.6"],
|
||||
})
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers),
|
||||
['1043::a321:0001', 0]
|
||||
["1043::a321:0001", 0]
|
||||
)
|
||||
|
||||
def test_multiple_proxys(self):
|
||||
headers = Headers({
|
||||
b'X-Forwarded-For': [b'10.1.2.3, 10.1.2.4'],
|
||||
b"X-Forwarded-For": [b"10.1.2.3, 10.1.2.4"],
|
||||
})
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers),
|
||||
['10.1.2.3', 0]
|
||||
["10.1.2.3", 0]
|
||||
)
|
||||
|
||||
def test_original(self):
|
||||
headers = Headers({})
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers, original=['127.0.0.1', 80]),
|
||||
['127.0.0.1', 80]
|
||||
parse_x_forwarded_for(headers, original=["127.0.0.1", 80]),
|
||||
["127.0.0.1", 80]
|
||||
)
|
||||
|
||||
def test_no_original(self):
|
||||
|
@ -68,46 +68,46 @@ class TestXForwardedForWsParsing(TestCase):
|
|||
|
||||
def test_basic(self):
|
||||
headers = {
|
||||
b'X-Forwarded-For': b'10.1.2.3',
|
||||
b'X-Forwarded-Port': b'1234',
|
||||
b"X-Forwarded-For": b"10.1.2.3",
|
||||
b"X-Forwarded-Port": b"1234",
|
||||
}
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers),
|
||||
['10.1.2.3', 1234]
|
||||
["10.1.2.3", 1234]
|
||||
)
|
||||
|
||||
def test_address_only(self):
|
||||
headers = {
|
||||
b'X-Forwarded-For': b'10.1.2.3',
|
||||
b"X-Forwarded-For": b"10.1.2.3",
|
||||
}
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers),
|
||||
['10.1.2.3', 0]
|
||||
["10.1.2.3", 0]
|
||||
)
|
||||
|
||||
def test_v6_address(self):
|
||||
headers = {
|
||||
b'X-Forwarded-For': [b'1043::a321:0001, 10.0.5.6'],
|
||||
b"X-Forwarded-For": [b"1043::a321:0001, 10.0.5.6"],
|
||||
}
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers),
|
||||
['1043::a321:0001', 0]
|
||||
["1043::a321:0001", 0]
|
||||
)
|
||||
|
||||
def test_multiple_proxys(self):
|
||||
headers = {
|
||||
b'X-Forwarded-For': b'10.1.2.3, 10.1.2.4',
|
||||
b"X-Forwarded-For": b"10.1.2.3, 10.1.2.4",
|
||||
}
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers),
|
||||
['10.1.2.3', 0]
|
||||
["10.1.2.3", 0]
|
||||
)
|
||||
|
||||
def test_original(self):
|
||||
headers = {}
|
||||
self.assertEqual(
|
||||
parse_x_forwarded_for(headers, original=['127.0.0.1', 80]),
|
||||
['127.0.0.1', 80]
|
||||
parse_x_forwarded_for(headers, original=["127.0.0.1", 80]),
|
||||
["127.0.0.1", 80]
|
||||
)
|
||||
|
||||
def test_no_original(self):
|
||||
|
|
|
@ -19,7 +19,7 @@ class WebSocketConnection(object):
|
|||
|
||||
self.channel_layer = ChannelLayer()
|
||||
self.factory = HTTPFactory(self.channel_layer, send_channel="test!")
|
||||
self.proto = self.factory.buildProtocol(('127.0.0.1', 0))
|
||||
self.proto = self.factory.buildProtocol(("127.0.0.1", 0))
|
||||
self.transport = proto_helpers.StringTransport()
|
||||
self.proto.makeConnection(self.transport)
|
||||
|
||||
|
@ -28,7 +28,7 @@ class WebSocketConnection(object):
|
|||
Low-level method to let Daphne handle HTTP/WebSocket data
|
||||
"""
|
||||
self.proto.dataReceived(request)
|
||||
_, self.last_message = self.channel_layer.receive(['websocket.connect'])
|
||||
_, self.last_message = self.channel_layer.receive(["websocket.connect"])
|
||||
return self.last_message
|
||||
|
||||
def send(self, content):
|
||||
|
@ -38,12 +38,12 @@ class WebSocketConnection(object):
|
|||
if self.last_message is None:
|
||||
# Auto-connect for convenience.
|
||||
self.connect()
|
||||
self.factory.dispatch_reply(self.last_message['reply_channel'], content)
|
||||
self.factory.dispatch_reply(self.last_message["reply_channel"], content)
|
||||
response = self.transport.value()
|
||||
self.transport.clear()
|
||||
return response
|
||||
|
||||
def connect(self, path='/', params=None, headers=None):
|
||||
def connect(self, path="/", params=None, headers=None):
|
||||
"""
|
||||
High-level method to perform the WebSocket handshake
|
||||
"""
|
||||
|
@ -78,21 +78,21 @@ class TestSendCloseAccept(testcases.ASGIWebSocketTestCase):
|
|||
"""
|
||||
|
||||
def test_empty_accept(self):
|
||||
response = WebSocketConnection().send({'accept': True})
|
||||
response = WebSocketConnection().send({"accept": True})
|
||||
self.assert_websocket_upgrade(response)
|
||||
|
||||
@given(text=http_strategies.http_body())
|
||||
def test_accept_and_text(self, text):
|
||||
response = WebSocketConnection().send({'accept': True, 'text': text})
|
||||
self.assert_websocket_upgrade(response, text.encode('ascii'))
|
||||
response = WebSocketConnection().send({"accept": True, "text": text})
|
||||
self.assert_websocket_upgrade(response, text.encode("ascii"))
|
||||
|
||||
@given(data=http_strategies.binary_payload())
|
||||
def test_accept_and_bytes(self, data):
|
||||
response = WebSocketConnection().send({'accept': True, 'bytes': data})
|
||||
response = WebSocketConnection().send({"accept": True, "bytes": data})
|
||||
self.assert_websocket_upgrade(response, data)
|
||||
|
||||
def test_accept_false(self):
|
||||
response = WebSocketConnection().send({'accept': False})
|
||||
response = WebSocketConnection().send({"accept": False})
|
||||
self.assert_websocket_denied(response)
|
||||
|
||||
def test_accept_false_with_text(self):
|
||||
|
@ -102,10 +102,10 @@ class TestSendCloseAccept(testcases.ASGIWebSocketTestCase):
|
|||
We can't easily use Hypothesis to generate data for this test because it's
|
||||
hard to detect absence of the body if e.g. Hypothesis would generate a 'GET'
|
||||
"""
|
||||
text = 'foobar'
|
||||
response = WebSocketConnection().send({'accept': False, 'text': text})
|
||||
text = "foobar"
|
||||
response = WebSocketConnection().send({"accept": False, "text": text})
|
||||
self.assert_websocket_denied(response)
|
||||
self.assertNotIn(text.encode('ascii'), response)
|
||||
self.assertNotIn(text.encode("ascii"), response)
|
||||
|
||||
def test_accept_false_with_bytes(self):
|
||||
"""
|
||||
|
@ -114,8 +114,8 @@ class TestSendCloseAccept(testcases.ASGIWebSocketTestCase):
|
|||
We can't easily use Hypothesis to generate data for this test because it's
|
||||
hard to detect absence of the body if e.g. Hypothesis would generate a 'GET'
|
||||
"""
|
||||
data = b'foobar'
|
||||
response = WebSocketConnection().send({'accept': False, 'bytes': data})
|
||||
data = b"foobar"
|
||||
response = WebSocketConnection().send({"accept": False, "bytes": data})
|
||||
self.assert_websocket_denied(response)
|
||||
self.assertNotIn(data, response)
|
||||
|
||||
|
@ -123,35 +123,35 @@ class TestSendCloseAccept(testcases.ASGIWebSocketTestCase):
|
|||
def test_just_text(self, text):
|
||||
assume(len(text) > 0)
|
||||
# If content is sent, accept=True is implied.
|
||||
response = WebSocketConnection().send({'text': text})
|
||||
self.assert_websocket_upgrade(response, text.encode('ascii'))
|
||||
response = WebSocketConnection().send({"text": text})
|
||||
self.assert_websocket_upgrade(response, text.encode("ascii"))
|
||||
|
||||
@given(data=http_strategies.binary_payload())
|
||||
def test_just_bytes(self, data):
|
||||
assume(len(data) > 0)
|
||||
# If content is sent, accept=True is implied.
|
||||
response = WebSocketConnection().send({'bytes': data})
|
||||
response = WebSocketConnection().send({"bytes": data})
|
||||
self.assert_websocket_upgrade(response, data)
|
||||
|
||||
def test_close_boolean(self):
|
||||
response = WebSocketConnection().send({'close': True})
|
||||
response = WebSocketConnection().send({"close": True})
|
||||
self.assert_websocket_denied(response)
|
||||
|
||||
@given(number=strategies.integers(min_value=1))
|
||||
def test_close_integer(self, number):
|
||||
response = WebSocketConnection().send({'close': number})
|
||||
response = WebSocketConnection().send({"close": number})
|
||||
self.assert_websocket_denied(response)
|
||||
|
||||
@given(text=http_strategies.http_body())
|
||||
def test_close_with_text(self, text):
|
||||
assume(len(text) > 0)
|
||||
response = WebSocketConnection().send({'close': True, 'text': text})
|
||||
self.assert_websocket_upgrade(response, text.encode('ascii'), expect_close=True)
|
||||
response = WebSocketConnection().send({"close": True, "text": text})
|
||||
self.assert_websocket_upgrade(response, text.encode("ascii"), expect_close=True)
|
||||
|
||||
@given(data=http_strategies.binary_payload())
|
||||
def test_close_with_data(self, data):
|
||||
assume(len(data) > 0)
|
||||
response = WebSocketConnection().send({'close': True, 'bytes': data})
|
||||
response = WebSocketConnection().send({"close": True, "bytes": data})
|
||||
self.assert_websocket_upgrade(response, data, expect_close=True)
|
||||
|
||||
|
||||
|
@ -177,34 +177,34 @@ class TestWebSocketProtocol(testcases.ASGIWebSocketTestCase):
|
|||
b"Origin: http://example.com\r\n"
|
||||
b"\r\n"
|
||||
)
|
||||
self.assertEqual(message['path'], "/chat")
|
||||
self.assertEqual(message['query_string'], b"")
|
||||
self.assertEqual(message["path"], "/chat")
|
||||
self.assertEqual(message["query_string"], b"")
|
||||
self.assertEqual(
|
||||
sorted(message['headers']),
|
||||
[(b'connection', b'Upgrade'),
|
||||
(b'host', b'somewhere.com'),
|
||||
(b'origin', b'http://example.com'),
|
||||
(b'sec-websocket-key', b'x3JJHMbDL1EzLkh9GBhXDw=='),
|
||||
(b'sec-websocket-protocol', b'chat, superchat'),
|
||||
(b'sec-websocket-version', b'13'),
|
||||
(b'upgrade', b'websocket')]
|
||||
sorted(message["headers"]),
|
||||
[(b"connection", b"Upgrade"),
|
||||
(b"host", b"somewhere.com"),
|
||||
(b"origin", b"http://example.com"),
|
||||
(b"sec-websocket-key", b"x3JJHMbDL1EzLkh9GBhXDw=="),
|
||||
(b"sec-websocket-protocol", b"chat, superchat"),
|
||||
(b"sec-websocket-version", b"13"),
|
||||
(b"upgrade", b"websocket")]
|
||||
)
|
||||
self.assert_valid_websocket_connect_message(message, '/chat')
|
||||
self.assert_valid_websocket_connect_message(message, "/chat")
|
||||
|
||||
# Accept the connection
|
||||
response = self.connection.send({'accept': True})
|
||||
response = self.connection.send({"accept": True})
|
||||
self.assert_websocket_upgrade(response)
|
||||
|
||||
# Send some text
|
||||
response = self.connection.send({'text': "Hello World!"})
|
||||
response = self.connection.send({"text": "Hello World!"})
|
||||
self.assertEqual(response, b"\x81\x0cHello World!")
|
||||
|
||||
# Send some bytes
|
||||
response = self.connection.send({'bytes': b"\xaa\xbb\xcc\xdd"})
|
||||
response = self.connection.send({"bytes": b"\xaa\xbb\xcc\xdd"})
|
||||
self.assertEqual(response, b"\x82\x04\xaa\xbb\xcc\xdd")
|
||||
|
||||
# Close the connection
|
||||
response = self.connection.send({'close': True})
|
||||
response = self.connection.send({"close": True})
|
||||
self.assertEqual(response, b"\x88\x02\x03\xe8")
|
||||
|
||||
def test_connection_with_file_origin_is_accepted(self):
|
||||
|
@ -219,11 +219,11 @@ class TestWebSocketProtocol(testcases.ASGIWebSocketTestCase):
|
|||
b"Origin: file://\r\n"
|
||||
b"\r\n"
|
||||
)
|
||||
self.assertIn((b'origin', b'file://'), message['headers'])
|
||||
self.assert_valid_websocket_connect_message(message, '/chat')
|
||||
self.assertIn((b"origin", b"file://"), message["headers"])
|
||||
self.assert_valid_websocket_connect_message(message, "/chat")
|
||||
|
||||
# Accept the connection
|
||||
response = self.connection.send({'accept': True})
|
||||
response = self.connection.send({"accept": True})
|
||||
self.assert_websocket_upgrade(response)
|
||||
|
||||
def test_connection_with_no_origin_is_accepted(self):
|
||||
|
@ -238,9 +238,9 @@ class TestWebSocketProtocol(testcases.ASGIWebSocketTestCase):
|
|||
b"\r\n"
|
||||
)
|
||||
|
||||
self.assertNotIn(b'origin', [header_tuple[0] for header_tuple in message['headers']])
|
||||
self.assert_valid_websocket_connect_message(message, '/chat')
|
||||
self.assertNotIn(b"origin", [header_tuple[0] for header_tuple in message["headers"]])
|
||||
self.assert_valid_websocket_connect_message(message, "/chat")
|
||||
|
||||
# Accept the connection
|
||||
response = self.connection.send({'accept': True})
|
||||
response = self.connection.send({"accept": True})
|
||||
self.assert_websocket_upgrade(response)
|
||||
|
|
|
@ -34,7 +34,7 @@ class ASGITestCaseBase(unittest.TestCase):
|
|||
def assert_valid_reply_channel(self, reply_channel):
|
||||
self.assertIsInstance(reply_channel, str)
|
||||
# The reply channel is decided by the server.
|
||||
self.assertTrue(reply_channel.startswith('test!'))
|
||||
self.assertTrue(reply_channel.startswith("test!"))
|
||||
|
||||
def assert_valid_path(self, path, request_path):
|
||||
self.assertIsInstance(path, str)
|
||||
|
@ -65,96 +65,96 @@ class ASGIHTTPTestCase(ASGITestCaseBase):
|
|||
|
||||
self.assert_presence_of_message_keys(
|
||||
channel_message.keys(),
|
||||
{'reply_channel', 'http_version', 'method', 'path', 'query_string', 'headers'},
|
||||
{'scheme', 'root_path', 'body', 'body_channel', 'client', 'server'})
|
||||
{"reply_channel", "http_version", "method", "path", "query_string", "headers"},
|
||||
{"scheme", "root_path", "body", "body_channel", "client", "server"})
|
||||
|
||||
# == Assertions about required channel_message fields ==
|
||||
self.assert_valid_reply_channel(channel_message['reply_channel'])
|
||||
self.assert_valid_path(channel_message['path'], request_path)
|
||||
self.assert_valid_reply_channel(channel_message["reply_channel"])
|
||||
self.assert_valid_path(channel_message["path"], request_path)
|
||||
|
||||
http_version = channel_message['http_version']
|
||||
http_version = channel_message["http_version"]
|
||||
self.assertIsInstance(http_version, str)
|
||||
self.assertIn(http_version, ['1.0', '1.1', '1.2'])
|
||||
self.assertIn(http_version, ["1.0", "1.1", "1.2"])
|
||||
|
||||
method = channel_message['method']
|
||||
method = channel_message["method"]
|
||||
self.assertIsInstance(method, str)
|
||||
self.assertTrue(method.isupper())
|
||||
self.assertEqual(channel_message['method'], request_method)
|
||||
self.assertEqual(channel_message["method"], request_method)
|
||||
|
||||
query_string = channel_message['query_string']
|
||||
query_string = channel_message["query_string"]
|
||||
# Assert that query_string is a byte string and still url encoded
|
||||
self.assertIsInstance(query_string, bytes)
|
||||
self.assertEqual(query_string, parse.urlencode(request_params or []).encode('ascii'))
|
||||
self.assertEqual(query_string, parse.urlencode(request_params or []).encode("ascii"))
|
||||
|
||||
# Ordering of header names is not important, but the order of values for a header
|
||||
# name is. To assert whether that order is kept, we transform both the request
|
||||
# headers and the channel message headers into a dictionary
|
||||
# {name: [value1, value2, ...]} and check if they're equal.
|
||||
transformed_message_headers = defaultdict(list)
|
||||
for name, value in channel_message['headers']:
|
||||
for name, value in channel_message["headers"]:
|
||||
transformed_message_headers[name].append(value)
|
||||
|
||||
transformed_request_headers = defaultdict(list)
|
||||
for name, value in (request_headers or []):
|
||||
expected_name = name.lower().strip().encode('ascii')
|
||||
expected_value = value.strip().encode('ascii')
|
||||
expected_name = name.lower().strip().encode("ascii")
|
||||
expected_value = value.strip().encode("ascii")
|
||||
transformed_request_headers[expected_name].append(expected_value)
|
||||
|
||||
self.assertEqual(transformed_message_headers, transformed_request_headers)
|
||||
|
||||
# == Assertions about optional channel_message fields ==
|
||||
|
||||
scheme = channel_message.get('scheme')
|
||||
scheme = channel_message.get("scheme")
|
||||
if scheme is not None:
|
||||
self.assertIsInstance(scheme, str)
|
||||
self.assertTrue(scheme) # May not be empty
|
||||
|
||||
root_path = channel_message.get('root_path')
|
||||
root_path = channel_message.get("root_path")
|
||||
if root_path is not None:
|
||||
self.assertIsInstance(root_path, str)
|
||||
|
||||
body = channel_message.get('body')
|
||||
body = channel_message.get("body")
|
||||
# Ensure we test for presence of 'body' if a request body was given
|
||||
if request_body is not None or body is not None:
|
||||
self.assertIsInstance(body, str)
|
||||
self.assertEqual(body, (request_body or '').encode('ascii'))
|
||||
self.assertEqual(body, (request_body or "").encode("ascii"))
|
||||
|
||||
body_channel = channel_message.get('body_channel')
|
||||
body_channel = channel_message.get("body_channel")
|
||||
if body_channel is not None:
|
||||
self.assertIsInstance(body_channel, str)
|
||||
self.assertIn('?', body_channel)
|
||||
self.assertIn("?", body_channel)
|
||||
|
||||
client = channel_message.get('client')
|
||||
client = channel_message.get("client")
|
||||
if client is not None:
|
||||
self.assert_valid_address_and_port(channel_message['client'])
|
||||
self.assert_valid_address_and_port(channel_message["client"])
|
||||
|
||||
server = channel_message.get('server')
|
||||
server = channel_message.get("server")
|
||||
if server is not None:
|
||||
self.assert_valid_address_and_port(channel_message['server'])
|
||||
self.assert_valid_address_and_port(channel_message["server"])
|
||||
|
||||
def assert_valid_http_response_message(self, message, response):
|
||||
self.assertTrue(message)
|
||||
self.assertTrue(response.startswith(b'HTTP'))
|
||||
self.assertTrue(response.startswith(b"HTTP"))
|
||||
|
||||
status_code_bytes = str(message['status']).encode('ascii')
|
||||
status_code_bytes = str(message["status"]).encode("ascii")
|
||||
self.assertIn(status_code_bytes, response)
|
||||
|
||||
if 'content' in message:
|
||||
self.assertIn(message['content'], response)
|
||||
if "content" in message:
|
||||
self.assertIn(message["content"], response)
|
||||
|
||||
# Check that headers are in the given order.
|
||||
# N.b. HTTP spec only enforces that the order of header values is kept, but
|
||||
# the ASGI spec requires that order of all headers is kept. This code
|
||||
# checks conformance with the stricter ASGI spec.
|
||||
if 'headers' in message:
|
||||
for name, value in message['headers']:
|
||||
if "headers" in message:
|
||||
for name, value in message["headers"]:
|
||||
expected_header = factories.header_line(name, value)
|
||||
# Daphne or Twisted turn our lower cased header names ('foo-bar') into title
|
||||
# case ('Foo-Bar'). So technically we want to to match that the header name is
|
||||
# present while ignoring casing, and want to ensure the value is present without
|
||||
# altered casing. The approach below does this well enough.
|
||||
self.assertIn(expected_header.lower(), response.lower())
|
||||
self.assertIn(value.encode('ascii'), response)
|
||||
self.assertIn(value.encode("ascii"), response)
|
||||
|
||||
|
||||
class ASGIWebSocketTestCase(ASGITestCaseBase):
|
||||
|
@ -162,17 +162,17 @@ class ASGIWebSocketTestCase(ASGITestCaseBase):
|
|||
Test case with helpers for verifying WebSocket channel messages
|
||||
"""
|
||||
|
||||
def assert_websocket_upgrade(self, response, body=b'', expect_close=False):
|
||||
def assert_websocket_upgrade(self, response, body=b"", expect_close=False):
|
||||
self.assertIn(b"HTTP/1.1 101 Switching Protocols", response)
|
||||
self.assertIn(b"Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=\r\n", response)
|
||||
self.assertIn(body, response)
|
||||
self.assertEqual(expect_close, response.endswith(b"\x88\x02\x03\xe8"))
|
||||
|
||||
def assert_websocket_denied(self, response):
|
||||
self.assertIn(b'HTTP/1.1 403', response)
|
||||
self.assertIn(b"HTTP/1.1 403", response)
|
||||
|
||||
def assert_valid_websocket_connect_message(
|
||||
self, channel_message, request_path='/', request_params=None, request_headers=None):
|
||||
self, channel_message, request_path="/", request_params=None, request_headers=None):
|
||||
"""
|
||||
Asserts that a given channel message conforms to the HTTP request section of the ASGI spec.
|
||||
"""
|
||||
|
@ -181,14 +181,14 @@ class ASGIWebSocketTestCase(ASGITestCaseBase):
|
|||
|
||||
self.assert_presence_of_message_keys(
|
||||
channel_message.keys(),
|
||||
{'reply_channel', 'path', 'headers', 'order'},
|
||||
{'scheme', 'query_string', 'root_path', 'client', 'server'})
|
||||
{"reply_channel", "path", "headers", "order"},
|
||||
{"scheme", "query_string", "root_path", "client", "server"})
|
||||
|
||||
# == Assertions about required channel_message fields ==
|
||||
self.assert_valid_reply_channel(channel_message['reply_channel'])
|
||||
self.assert_valid_path(channel_message['path'], request_path)
|
||||
self.assert_valid_reply_channel(channel_message["reply_channel"])
|
||||
self.assert_valid_path(channel_message["path"], request_path)
|
||||
|
||||
order = channel_message['order']
|
||||
order = channel_message["order"]
|
||||
self.assertIsInstance(order, int)
|
||||
self.assertEqual(order, 0)
|
||||
|
||||
|
@ -200,39 +200,39 @@ class ASGIWebSocketTestCase(ASGITestCaseBase):
|
|||
# get one string per header field with values separated by comma.
|
||||
transformed_request_headers = defaultdict(list)
|
||||
for name, value in (request_headers or []):
|
||||
expected_name = name.lower().strip().encode('ascii')
|
||||
expected_value = value.strip().encode('ascii')
|
||||
expected_name = name.lower().strip().encode("ascii")
|
||||
expected_value = value.strip().encode("ascii")
|
||||
transformed_request_headers[expected_name].append(expected_value)
|
||||
final_request_headers = {
|
||||
(name, b','.join(value)) for name, value in transformed_request_headers.items()
|
||||
(name, b",".join(value)) for name, value in transformed_request_headers.items()
|
||||
}
|
||||
|
||||
# Websockets carry a lot of additional header fields, so instead of verifying that
|
||||
# headers look exactly like expected, we just check that the expected header fields
|
||||
# and values are present - additional header fields (e.g. Sec-WebSocket-Key) are allowed
|
||||
# and not tested for.
|
||||
assert final_request_headers.issubset(set(channel_message['headers']))
|
||||
assert final_request_headers.issubset(set(channel_message["headers"]))
|
||||
|
||||
# == Assertions about optional channel_message fields ==
|
||||
scheme = channel_message.get('scheme')
|
||||
scheme = channel_message.get("scheme")
|
||||
if scheme:
|
||||
self.assertIsInstance(scheme, six.text_type)
|
||||
self.assertIn(scheme, ['ws', 'wss'])
|
||||
self.assertIn(scheme, ["ws", "wss"])
|
||||
|
||||
query_string = channel_message.get('query_string')
|
||||
query_string = channel_message.get("query_string")
|
||||
if query_string:
|
||||
# Assert that query_string is a byte string and still url encoded
|
||||
self.assertIsInstance(query_string, six.binary_type)
|
||||
self.assertEqual(query_string, parse.urlencode(request_params or []).encode('ascii'))
|
||||
self.assertEqual(query_string, parse.urlencode(request_params or []).encode("ascii"))
|
||||
|
||||
root_path = channel_message.get('root_path')
|
||||
root_path = channel_message.get("root_path")
|
||||
if root_path is not None:
|
||||
self.assertIsInstance(root_path, six.text_type)
|
||||
|
||||
client = channel_message.get('client')
|
||||
client = channel_message.get("client")
|
||||
if client is not None:
|
||||
self.assert_valid_address_and_port(channel_message['client'])
|
||||
self.assert_valid_address_and_port(channel_message["client"])
|
||||
|
||||
server = channel_message.get('server')
|
||||
server = channel_message.get("server")
|
||||
if server is not None:
|
||||
self.assert_valid_address_and_port(channel_message['server'])
|
||||
self.assert_valid_address_and_port(channel_message["server"])
|
||||
|
|
|
@ -23,8 +23,8 @@ def header_value(headers, header_name):
|
|||
|
||||
|
||||
def parse_x_forwarded_for(headers,
|
||||
address_header_name='X-Forwarded-For',
|
||||
port_header_name='X-Forwarded-Port',
|
||||
address_header_name="X-Forwarded-For",
|
||||
port_header_name="X-Forwarded-Port",
|
||||
original=None):
|
||||
"""
|
||||
Parses an X-Forwarded-For header and returns a host/port pair as a list.
|
||||
|
@ -50,7 +50,7 @@ def parse_x_forwarded_for(headers,
|
|||
if address_header_name in headers:
|
||||
address_value = header_value(headers, address_header_name)
|
||||
|
||||
if ',' in address_value:
|
||||
if "," in address_value:
|
||||
address_value = address_value.split(",")[0].strip()
|
||||
|
||||
result = [address_value, 0]
|
||||
|
|
|
@ -61,7 +61,7 @@ class WebSocketProtocol(WebSocketServerProtocol):
|
|||
# Decode websocket subprotocol options
|
||||
subprotocols = []
|
||||
for header, value in self.clean_headers:
|
||||
if header == b'sec-websocket-protocol':
|
||||
if header == b"sec-websocket-protocol":
|
||||
subprotocols = [x.strip() for x in self.unquote(value).split(",")]
|
||||
# Make new application instance with scope
|
||||
self.path = request.path.encode("ascii")
|
||||
|
|
60
setup.py
60
setup.py
|
@ -10,47 +10,47 @@ with open(readme_path) as fp:
|
|||
long_description = fp.read()
|
||||
|
||||
setup(
|
||||
name='daphne',
|
||||
name="daphne",
|
||||
version=__version__,
|
||||
url='https://github.com/django/daphne',
|
||||
author='Django Software Foundation',
|
||||
author_email='foundation@djangoproject.com',
|
||||
description='Django ASGI (HTTP/WebSocket) server',
|
||||
url="https://github.com/django/daphne",
|
||||
author="Django Software Foundation",
|
||||
author_email="foundation@djangoproject.com",
|
||||
description="Django ASGI (HTTP/WebSocket) server",
|
||||
long_description=long_description,
|
||||
license='BSD',
|
||||
license="BSD",
|
||||
zip_safe=False,
|
||||
package_dir={'twisted': 'daphne/twisted'},
|
||||
packages=find_packages() + ['twisted.plugins'],
|
||||
package_dir={"twisted": "daphne/twisted"},
|
||||
packages=find_packages() + ["twisted.plugins"],
|
||||
include_package_data=True,
|
||||
install_requires=[
|
||||
'asgiref~=2.0',
|
||||
'twisted>=17.5',
|
||||
'autobahn>=0.18',
|
||||
"asgiref~=2.0",
|
||||
"twisted>=17.5",
|
||||
"autobahn>=0.18",
|
||||
],
|
||||
setup_requires=[
|
||||
'pytest-runner',
|
||||
"pytest-runner",
|
||||
],
|
||||
tests_require=[
|
||||
'hypothesis',
|
||||
'tox',
|
||||
'pytest',
|
||||
"hypothesis",
|
||||
"tox",
|
||||
"pytest",
|
||||
],
|
||||
entry_points={'console_scripts': [
|
||||
'daphne = daphne.cli:CommandLineInterface.entrypoint',
|
||||
entry_points={"console_scripts": [
|
||||
"daphne = daphne.cli:CommandLineInterface.entrypoint",
|
||||
]},
|
||||
classifiers=[
|
||||
'Development Status :: 4 - Beta',
|
||||
'Environment :: Web Environment',
|
||||
'Intended Audience :: Developers',
|
||||
'License :: OSI Approved :: BSD License',
|
||||
'Operating System :: OS Independent',
|
||||
'Programming Language :: Python',
|
||||
'Programming Language :: Python :: 2',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: 3.5',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Topic :: Internet :: WWW/HTTP',
|
||||
"Development Status :: 4 - Beta",
|
||||
"Environment :: Web Environment",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: BSD License",
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 2",
|
||||
"Programming Language :: Python :: 2.7",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.4",
|
||||
"Programming Language :: Python :: 3.5",
|
||||
"Programming Language :: Python :: 3.6",
|
||||
"Topic :: Internet :: WWW/HTTP",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -13,26 +13,26 @@ class TestEndpointDescriptions(TestCase):
|
|||
"""
|
||||
|
||||
def testBasics(self):
|
||||
self.assertEqual(build(), [], msg='Empty list returned when no kwargs given')
|
||||
self.assertEqual(build(), [], msg="Empty list returned when no kwargs given")
|
||||
|
||||
def testTcpPortBindings(self):
|
||||
self.assertEqual(
|
||||
build(port=1234, host='example.com'),
|
||||
['tcp:port=1234:interface=example.com']
|
||||
build(port=1234, host="example.com"),
|
||||
["tcp:port=1234:interface=example.com"]
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
build(port=8000, host='127.0.0.1'),
|
||||
['tcp:port=8000:interface=127.0.0.1']
|
||||
build(port=8000, host="127.0.0.1"),
|
||||
["tcp:port=8000:interface=127.0.0.1"]
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
build(port=8000, host='[200a::1]'),
|
||||
build(port=8000, host="[200a::1]"),
|
||||
[r'tcp:port=8000:interface=200a\:\:1']
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
build(port=8000, host='200a::1'),
|
||||
build(port=8000, host="200a::1"),
|
||||
[r'tcp:port=8000:interface=200a\:\:1']
|
||||
)
|
||||
|
||||
|
@ -43,19 +43,19 @@ class TestEndpointDescriptions(TestCase):
|
|||
)
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
build, host='example.com'
|
||||
build, host="example.com"
|
||||
)
|
||||
|
||||
def testUnixSocketBinding(self):
|
||||
self.assertEqual(
|
||||
build(unix_socket='/tmp/daphne.sock'),
|
||||
['unix:/tmp/daphne.sock']
|
||||
build(unix_socket="/tmp/daphne.sock"),
|
||||
["unix:/tmp/daphne.sock"]
|
||||
)
|
||||
|
||||
def testFileDescriptorBinding(self):
|
||||
self.assertEqual(
|
||||
build(file_descriptor=5),
|
||||
['fd:fileno=5']
|
||||
["fd:fileno=5"]
|
||||
)
|
||||
|
||||
def testMultipleEnpoints(self):
|
||||
|
@ -63,15 +63,15 @@ class TestEndpointDescriptions(TestCase):
|
|||
sorted(
|
||||
build(
|
||||
file_descriptor=123,
|
||||
unix_socket='/tmp/daphne.sock',
|
||||
unix_socket="/tmp/daphne.sock",
|
||||
port=8080,
|
||||
host='10.0.0.1'
|
||||
host="10.0.0.1"
|
||||
)
|
||||
),
|
||||
sorted([
|
||||
'tcp:port=8080:interface=10.0.0.1',
|
||||
'unix:/tmp/daphne.sock',
|
||||
'fd:fileno=123'
|
||||
"tcp:port=8080:interface=10.0.0.1",
|
||||
"unix:/tmp/daphne.sock",
|
||||
"fd:fileno=123"
|
||||
])
|
||||
)
|
||||
|
||||
|
@ -112,7 +112,7 @@ class TestCLIInterface(TestCase):
|
|||
Passes in a fake application automatically.
|
||||
"""
|
||||
cli = self.TestedCLI()
|
||||
cli.run(args + ['daphne:__version__']) # We just pass something importable as app
|
||||
cli.run(args + ["daphne:__version__"]) # We just pass something importable as app
|
||||
# Check the server got all arguments as intended
|
||||
for key, value in server_kwargs.items():
|
||||
# Get the value and sort it if it's a list (for endpoint checking)
|
||||
|
@ -123,7 +123,7 @@ class TestCLIInterface(TestCase):
|
|||
self.assertEqual(
|
||||
value,
|
||||
actual_value,
|
||||
'Wrong value for server kwarg %s: %r != %r' % (
|
||||
"Wrong value for server kwarg %s: %r != %r" % (
|
||||
key,
|
||||
value,
|
||||
actual_value,
|
||||
|
@ -137,65 +137,65 @@ class TestCLIInterface(TestCase):
|
|||
self.assertCLI(
|
||||
[],
|
||||
{
|
||||
'endpoints': ['tcp:port=8000:interface=127.0.0.1'],
|
||||
"endpoints": ["tcp:port=8000:interface=127.0.0.1"],
|
||||
},
|
||||
)
|
||||
self.assertCLI(
|
||||
['-p', '123'],
|
||||
["-p", "123"],
|
||||
{
|
||||
'endpoints': ['tcp:port=123:interface=127.0.0.1'],
|
||||
"endpoints": ["tcp:port=123:interface=127.0.0.1"],
|
||||
},
|
||||
)
|
||||
self.assertCLI(
|
||||
['-b', '10.0.0.1'],
|
||||
["-b", "10.0.0.1"],
|
||||
{
|
||||
'endpoints': ['tcp:port=8000:interface=10.0.0.1'],
|
||||
"endpoints": ["tcp:port=8000:interface=10.0.0.1"],
|
||||
},
|
||||
)
|
||||
self.assertCLI(
|
||||
['-b', '200a::1'],
|
||||
["-b", "200a::1"],
|
||||
{
|
||||
'endpoints': [r'tcp:port=8000:interface=200a\:\:1'],
|
||||
"endpoints": [r'tcp:port=8000:interface=200a\:\:1'],
|
||||
},
|
||||
)
|
||||
self.assertCLI(
|
||||
['-b', '[200a::1]'],
|
||||
["-b", "[200a::1]"],
|
||||
{
|
||||
'endpoints': [r'tcp:port=8000:interface=200a\:\:1'],
|
||||
"endpoints": [r'tcp:port=8000:interface=200a\:\:1'],
|
||||
},
|
||||
)
|
||||
self.assertCLI(
|
||||
['-p', '8080', '-b', 'example.com'],
|
||||
["-p", "8080", "-b", "example.com"],
|
||||
{
|
||||
'endpoints': ['tcp:port=8080:interface=example.com'],
|
||||
"endpoints": ["tcp:port=8080:interface=example.com"],
|
||||
},
|
||||
)
|
||||
|
||||
def testUnixSockets(self):
|
||||
self.assertCLI(
|
||||
['-p', '8080', '-u', '/tmp/daphne.sock'],
|
||||
["-p", "8080", "-u", "/tmp/daphne.sock"],
|
||||
{
|
||||
'endpoints': [
|
||||
'tcp:port=8080:interface=127.0.0.1',
|
||||
'unix:/tmp/daphne.sock',
|
||||
"endpoints": [
|
||||
"tcp:port=8080:interface=127.0.0.1",
|
||||
"unix:/tmp/daphne.sock",
|
||||
],
|
||||
},
|
||||
)
|
||||
self.assertCLI(
|
||||
['-b', 'example.com', '-u', '/tmp/daphne.sock'],
|
||||
["-b", "example.com", "-u", "/tmp/daphne.sock"],
|
||||
{
|
||||
'endpoints': [
|
||||
'tcp:port=8000:interface=example.com',
|
||||
'unix:/tmp/daphne.sock',
|
||||
"endpoints": [
|
||||
"tcp:port=8000:interface=example.com",
|
||||
"unix:/tmp/daphne.sock",
|
||||
],
|
||||
},
|
||||
)
|
||||
self.assertCLI(
|
||||
['-u', '/tmp/daphne.sock', '--fd', '5'],
|
||||
["-u", "/tmp/daphne.sock", "--fd", "5"],
|
||||
{
|
||||
'endpoints': [
|
||||
'fd:fileno=5',
|
||||
'unix:/tmp/daphne.sock'
|
||||
"endpoints": [
|
||||
"fd:fileno=5",
|
||||
"unix:/tmp/daphne.sock"
|
||||
],
|
||||
},
|
||||
)
|
||||
|
@ -205,20 +205,20 @@ class TestCLIInterface(TestCase):
|
|||
Tests mixing the shortcut options with the endpoint string options.
|
||||
"""
|
||||
self.assertCLI(
|
||||
['-p', '8080', '-e', 'unix:/tmp/daphne.sock'],
|
||||
["-p", "8080", "-e", "unix:/tmp/daphne.sock"],
|
||||
{
|
||||
'endpoints': [
|
||||
'tcp:port=8080:interface=127.0.0.1',
|
||||
'unix:/tmp/daphne.sock'
|
||||
"endpoints": [
|
||||
"tcp:port=8080:interface=127.0.0.1",
|
||||
"unix:/tmp/daphne.sock"
|
||||
],
|
||||
},
|
||||
)
|
||||
self.assertCLI(
|
||||
['-p', '8080', '-e', 'tcp:port=8080:interface=127.0.0.1'],
|
||||
["-p", "8080", "-e", "tcp:port=8080:interface=127.0.0.1"],
|
||||
{
|
||||
'endpoints': [
|
||||
'tcp:port=8080:interface=127.0.0.1',
|
||||
'tcp:port=8080:interface=127.0.0.1',
|
||||
"endpoints": [
|
||||
"tcp:port=8080:interface=127.0.0.1",
|
||||
"tcp:port=8080:interface=127.0.0.1",
|
||||
],
|
||||
},
|
||||
)
|
||||
|
@ -228,10 +228,10 @@ class TestCLIInterface(TestCase):
|
|||
Tests entirely custom endpoints
|
||||
"""
|
||||
self.assertCLI(
|
||||
['-e', 'imap:'],
|
||||
["-e", "imap:"],
|
||||
{
|
||||
'endpoints': [
|
||||
'imap:',
|
||||
"endpoints": [
|
||||
"imap:",
|
||||
],
|
||||
},
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue
Block a user