mirror of
				https://github.com/django/daphne.git
				synced 2025-11-04 01:27:33 +03:00 
			
		
		
		
	* add first streaming and file response tests * iterate over response and not streaming content directly * add coverage for FileResponse and StreamingHttpResponse * added tests for headers, json responses, and redirect responses * rm print statement * skip failing stringio test
		
			
				
	
	
		
			327 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			327 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import unicode_literals
 | 
						|
 | 
						|
import os
 | 
						|
import unittest
 | 
						|
from datetime import datetime
 | 
						|
from itertools import islice
 | 
						|
 | 
						|
from django.http import (
 | 
						|
    FileResponse, HttpResponse, HttpResponseRedirect, JsonResponse,
 | 
						|
    StreamingHttpResponse,
 | 
						|
)
 | 
						|
from six import BytesIO, StringIO
 | 
						|
 | 
						|
from channels import Channel
 | 
						|
from channels.handler import AsgiHandler
 | 
						|
from channels.tests import ChannelTestCase
 | 
						|
 | 
						|
 | 
						|
class FakeAsgiHandler(AsgiHandler):
 | 
						|
    """
 | 
						|
    Handler subclass that just returns a premade response rather than
 | 
						|
    go into the view subsystem.
 | 
						|
    """
 | 
						|
 | 
						|
    chunk_size = 30
 | 
						|
 | 
						|
    def __init__(self, response):
 | 
						|
        assert isinstance(response, (HttpResponse, StreamingHttpResponse))
 | 
						|
        self._response = response
 | 
						|
        super(FakeAsgiHandler, self).__init__()
 | 
						|
 | 
						|
    def get_response(self, request):
 | 
						|
        return self._response
 | 
						|
 | 
						|
 | 
						|
class HandlerTests(ChannelTestCase):
 | 
						|
    """
 | 
						|
    Tests that the handler works correctly and round-trips things into a
 | 
						|
    correct response.
 | 
						|
    """
 | 
						|
 | 
						|
    def test_basic(self):
 | 
						|
        """
 | 
						|
        Tests a simple request
 | 
						|
        """
 | 
						|
        # Make stub request and desired response
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = HttpResponse(b"Hi there!", content_type="text/plain")
 | 
						|
        # Run the handler
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        # Make sure we got the right number of messages
 | 
						|
        self.assertEqual(len(reply_messages), 1)
 | 
						|
        reply_message = reply_messages[0]
 | 
						|
        # Make sure the message looks correct
 | 
						|
        self.assertEqual(reply_message["content"], b"Hi there!")
 | 
						|
        self.assertEqual(reply_message["status"], 200)
 | 
						|
        self.assertEqual(reply_message.get("more_content", False), False)
 | 
						|
        self.assertEqual(
 | 
						|
            reply_message["headers"],
 | 
						|
            [
 | 
						|
                (b"Content-Type", b"text/plain"),
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
    def test_cookies(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = HttpResponse(b"Hi there!", content_type="text/plain")
 | 
						|
        response.set_signed_cookie('foo', '1', expires=datetime.now())
 | 
						|
        # Run the handler
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        # Make sure we got the right number of messages
 | 
						|
        self.assertEqual(len(reply_messages), 1)
 | 
						|
        reply_message = reply_messages[0]
 | 
						|
        # Make sure the message looks correct
 | 
						|
        self.assertEqual(reply_message["content"], b"Hi there!")
 | 
						|
        self.assertEqual(reply_message["status"], 200)
 | 
						|
        self.assertEqual(reply_message.get("more_content", False), False)
 | 
						|
        self.assertEqual(reply_message["headers"][0], (b'Content-Type', b'text/plain'))
 | 
						|
        self.assertIn('foo=', reply_message["headers"][1][1].decode())
 | 
						|
 | 
						|
    def test_headers(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = HttpResponse(b"Hi there!", content_type="text/plain")
 | 
						|
        response['foo'] = 1
 | 
						|
        response['bar'] = 1
 | 
						|
        del response['bar']
 | 
						|
        del response['nonexistant_key']
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        # Make sure we got the right number of messages
 | 
						|
        self.assertEqual(len(reply_messages), 1)
 | 
						|
        reply_message = reply_messages[0]
 | 
						|
        # Make sure the message looks correct
 | 
						|
        self.assertEqual(reply_message["content"], b"Hi there!")
 | 
						|
        header_dict = dict(reply_messages[0]['headers'])
 | 
						|
        self.assertEqual(header_dict[b'foo'].decode(), '1')
 | 
						|
        self.assertNotIn('bar', header_dict)
 | 
						|
 | 
						|
    def test_large(self):
 | 
						|
        """
 | 
						|
        Tests a large response (will need chunking)
 | 
						|
        """
 | 
						|
        # Make stub request and desired response
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = HttpResponse(
 | 
						|
            b"Thefirstthirtybytesisrighthereandhereistherest")
 | 
						|
        # Run the handler
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        # Make sure we got the right number of messages
 | 
						|
        self.assertEqual(len(reply_messages), 2)
 | 
						|
        # Make sure the messages look correct
 | 
						|
        self.assertEqual(reply_messages[0][
 | 
						|
                         "content"], b"Thefirstthirtybytesisrighthere")
 | 
						|
        self.assertEqual(reply_messages[0]["status"], 200)
 | 
						|
        self.assertEqual(reply_messages[0]["more_content"], True)
 | 
						|
        self.assertEqual(reply_messages[1]["content"], b"andhereistherest")
 | 
						|
        self.assertEqual(reply_messages[1].get("more_content", False), False)
 | 
						|
 | 
						|
    def test_chunk_bytes(self):
 | 
						|
        """
 | 
						|
        Makes sure chunk_bytes works correctly
 | 
						|
        """
 | 
						|
        # Empty string should still return one chunk
 | 
						|
        result = list(FakeAsgiHandler.chunk_bytes(b""))
 | 
						|
        self.assertEqual(len(result), 1)
 | 
						|
        self.assertEqual(result[0][0], b"")
 | 
						|
        self.assertEqual(result[0][1], True)
 | 
						|
        # Below chunk size
 | 
						|
        result = list(FakeAsgiHandler.chunk_bytes(
 | 
						|
            b"12345678901234567890123456789"))
 | 
						|
        self.assertEqual(len(result), 1)
 | 
						|
        self.assertEqual(result[0][0], b"12345678901234567890123456789")
 | 
						|
        self.assertEqual(result[0][1], True)
 | 
						|
        # Exactly chunk size
 | 
						|
        result = list(FakeAsgiHandler.chunk_bytes(
 | 
						|
            b"123456789012345678901234567890"))
 | 
						|
        self.assertEqual(len(result), 1)
 | 
						|
        self.assertEqual(result[0][0], b"123456789012345678901234567890")
 | 
						|
        self.assertEqual(result[0][1], True)
 | 
						|
        # Just above chunk size
 | 
						|
        result = list(FakeAsgiHandler.chunk_bytes(
 | 
						|
            b"123456789012345678901234567890a"))
 | 
						|
        self.assertEqual(len(result), 2)
 | 
						|
        self.assertEqual(result[0][0], b"123456789012345678901234567890")
 | 
						|
        self.assertEqual(result[0][1], False)
 | 
						|
        self.assertEqual(result[1][0], b"a")
 | 
						|
        self.assertEqual(result[1][1], True)
 | 
						|
 | 
						|
    def test_iterator(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = HttpResponse(range(10))
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        self.assertEqual(len(reply_messages), 1)
 | 
						|
        self.assertEqual(reply_messages[0]["content"], b"0123456789")
 | 
						|
 | 
						|
    def test_streaming_data(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = StreamingHttpResponse('Line: %s' % i for i in range(10))
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        self.assertEqual(len(reply_messages), 11)
 | 
						|
        self.assertEqual(reply_messages[0]["content"], b"Line: 0")
 | 
						|
        self.assertEqual(reply_messages[9]["content"], b"Line: 9")
 | 
						|
 | 
						|
    def test_real_file_response(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        current_dir = os.path.realpath(os.path.join(
 | 
						|
            os.getcwd(), os.path.dirname(__file__)))
 | 
						|
        response = FileResponse(
 | 
						|
            open(os.path.join(current_dir, 'a_file'), 'rb'))
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        self.assertEqual(len(reply_messages), 2)
 | 
						|
        self.assertEqual(response.getvalue(), b'')
 | 
						|
 | 
						|
    def test_bytes_file_response(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = FileResponse(BytesIO(b'sadfdasfsdfsadf'))
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        self.assertEqual(len(reply_messages), 2)
 | 
						|
 | 
						|
    def test_string_file_response(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = FileResponse('abcd')
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(
 | 
						|
            handler(self.get_next_message("test", require=True)))
 | 
						|
        self.assertEqual(len(reply_messages), 5)
 | 
						|
 | 
						|
    def test_non_streaming_file_response(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = FileResponse(BytesIO(b'sadfdasfsdfsadf'))
 | 
						|
        # This is to test the exception handling. This would only happening if
 | 
						|
        # the StreamingHttpResponse was incorrectly subclassed.
 | 
						|
        response.streaming = False
 | 
						|
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        with self.assertRaises(AttributeError):
 | 
						|
            list(handler(self.get_next_message("test", require=True)))
 | 
						|
 | 
						|
    def test_unclosable_filelike_object(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
 | 
						|
        # This is a readable object that cannot be closed.
 | 
						|
        class Unclosable:
 | 
						|
 | 
						|
            def read(self, n=-1):
 | 
						|
                # Nothing to see here
 | 
						|
                return b""
 | 
						|
 | 
						|
        response = FileResponse(Unclosable())
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(islice(handler(self.get_next_message("test", require=True)), 5))
 | 
						|
        self.assertEqual(len(reply_messages), 1)
 | 
						|
        response.close()
 | 
						|
 | 
						|
    def test_json_response(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = JsonResponse({'foo': (1, 2)})
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        reply_messages = list(handler(self.get_next_message("test", require=True)))
 | 
						|
        self.assertEqual(len(reply_messages), 1)
 | 
						|
        self.assertEqual(reply_messages[0]['content'], b'{"foo": [1, 2]}')
 | 
						|
 | 
						|
    def test_redirect(self):
 | 
						|
        for redirect_to in ['/', '..', 'https://example.com']:
 | 
						|
            Channel("test").send({
 | 
						|
                "reply_channel": "test",
 | 
						|
                "http_version": "1.1",
 | 
						|
                "method": "GET",
 | 
						|
                "path": b"/test/",
 | 
						|
            })
 | 
						|
            response = HttpResponseRedirect(redirect_to)
 | 
						|
            handler = FakeAsgiHandler(response)
 | 
						|
            reply_messages = list(handler(self.get_next_message("test", require=True)))
 | 
						|
            self.assertEqual(reply_messages[0]['status'], 302)
 | 
						|
            header_dict = dict(reply_messages[0]['headers'])
 | 
						|
            self.assertEqual(header_dict[b'Location'].decode(), redirect_to)
 | 
						|
 | 
						|
    @unittest.skip("failing under python 3")
 | 
						|
    def test_stringio_file_response(self):
 | 
						|
        Channel("test").send({
 | 
						|
            "reply_channel": "test",
 | 
						|
            "http_version": "1.1",
 | 
						|
            "method": "GET",
 | 
						|
            "path": b"/test/",
 | 
						|
        })
 | 
						|
        response = FileResponse(StringIO('sadfdasfsdfsadf'))
 | 
						|
        handler = FakeAsgiHandler(response)
 | 
						|
        # Use islice because the generator never ends.
 | 
						|
        reply_messages = list(
 | 
						|
            islice(handler(self.get_next_message("test", require=True)), 5))
 | 
						|
        self.assertEqual(len(reply_messages), 2, reply_messages)
 |