Fix line endings

This commit is contained in:
Andrew Godwin 2015-06-10 09:42:34 -07:00
parent c9eb683ed8
commit 95e706f71f
12 changed files with 453 additions and 453 deletions

View File

@ -1,18 +1,18 @@
# Load backends
DEFAULT_CHANNEL_BACKEND = "default"
from .backends import BackendManager
from django.conf import settings
channel_backends = BackendManager(
getattr(settings, "CHANNEL_BACKENDS", {
DEFAULT_CHANNEL_BACKEND: {
"BACKEND": "channels.backends.memory.InMemoryChannelBackend",
}
})
)
# Ensure monkeypatching
from .hacks import monkeypatch_django
monkeypatch_django()
# Promote channel to top-level (down here to avoid circular import errs)
from .channel import Channel
# Load backends
DEFAULT_CHANNEL_BACKEND = "default"
from .backends import BackendManager
from django.conf import settings
channel_backends = BackendManager(
getattr(settings, "CHANNEL_BACKENDS", {
DEFAULT_CHANNEL_BACKEND: {
"BACKEND": "channels.backends.memory.InMemoryChannelBackend",
}
})
)
# Ensure monkeypatching
from .hacks import monkeypatch_django
monkeypatch_django()
# Promote channel to top-level (down here to avoid circular import errs)
from .channel import Channel

View File

@ -1,53 +1,53 @@
import functools
from django.core.handlers.base import BaseHandler
from django.http import HttpRequest, HttpResponse
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
class UrlConsumer(object):
"""
Dispatches channel HTTP requests into django's URL system.
"""
def __init__(self):
self.handler = BaseHandler()
self.handler.load_middleware()
def __call__(self, channel, **kwargs):
request = HttpRequest.channel_decode(kwargs)
try:
response = self.handler.get_response(request)
except HttpResponse.ResponseLater:
return
Channel(request.response_channel).send(**response.channel_encode())
def view_producer(channel_name):
"""
Returns a new view function that actually writes the request to a channel
and abandons the response (with an exception the Worker will catch)
"""
def producing_view(request):
Channel(channel_name).send(**request.channel_encode())
raise HttpResponse.ResponseLater()
return producing_view
def view_consumer(channel_name, alias=DEFAULT_CHANNEL_BACKEND):
"""
Decorates a normal Django view to be a channel consumer.
Does not run any middleware
"""
def inner(func):
@functools.wraps(func)
def consumer(channel, **kwargs):
request = HttpRequest.channel_decode(kwargs)
response = func(request)
Channel(request.response_channel).send(**response.channel_encode())
# Get the channel layer and register
channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND]
channel_layer.registry.add_consumer(consumer, [channel_name])
return func
return inner
import functools
from django.core.handlers.base import BaseHandler
from django.http import HttpRequest, HttpResponse
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
class UrlConsumer(object):
"""
Dispatches channel HTTP requests into django's URL system.
"""
def __init__(self):
self.handler = BaseHandler()
self.handler.load_middleware()
def __call__(self, channel, **kwargs):
request = HttpRequest.channel_decode(kwargs)
try:
response = self.handler.get_response(request)
except HttpResponse.ResponseLater:
return
Channel(request.response_channel).send(**response.channel_encode())
def view_producer(channel_name):
"""
Returns a new view function that actually writes the request to a channel
and abandons the response (with an exception the Worker will catch)
"""
def producing_view(request):
Channel(channel_name).send(**request.channel_encode())
raise HttpResponse.ResponseLater()
return producing_view
def view_consumer(channel_name, alias=DEFAULT_CHANNEL_BACKEND):
"""
Decorates a normal Django view to be a channel consumer.
Does not run any middleware
"""
def inner(func):
@functools.wraps(func)
def consumer(channel, **kwargs):
request = HttpRequest.channel_decode(kwargs)
response = func(request)
Channel(request.response_channel).send(**response.channel_encode())
# Get the channel layer and register
channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND]
channel_layer.registry.add_consumer(consumer, [channel_name])
return func
return inner

View File

@ -1,33 +1,33 @@
from channels.consumer_registry import ConsumerRegistry
class ChannelClosed(Exception):
"""
Raised when you try to send to a closed channel.
"""
pass
class BaseChannelBackend(object):
"""
Base class for all channel layer implementations. Manages both sending
and receving messages from the backend, and each comes with its own
registry of consumers.
"""
def __init__(self, expiry=60):
self.registry = ConsumerRegistry()
self.expiry = expiry
def send(self, channel, message):
"""
Send a message over the channel, taken from the kwargs.
"""
raise NotImplementedError()
def receive_many(self, channels):
"""
Block and return the first message available on one of the
channels passed, as a (channel, message) tuple.
"""
raise NotImplementedError()
from channels.consumer_registry import ConsumerRegistry
class ChannelClosed(Exception):
"""
Raised when you try to send to a closed channel.
"""
pass
class BaseChannelBackend(object):
"""
Base class for all channel layer implementations. Manages both sending
and receving messages from the backend, and each comes with its own
registry of consumers.
"""
def __init__(self, expiry=60):
self.registry = ConsumerRegistry()
self.expiry = expiry
def send(self, channel, message):
"""
Send a message over the channel, taken from the kwargs.
"""
raise NotImplementedError()
def receive_many(self, channels):
"""
Block and return the first message available on one of the
channels passed, as a (channel, message) tuple.
"""
raise NotImplementedError()

View File

@ -1,32 +1,32 @@
import time
import json
from collections import deque
from .base import BaseChannelBackend
queues = {}
class InMemoryChannelBackend(BaseChannelBackend):
"""
In-memory channel implementation. Intended only for use with threading,
in low-throughput development environments.
"""
def send(self, channel, message):
# Try JSON encoding it to make sure it would, but store the native version
json.dumps(message)
# Add to the deque, making it if needs be
queues.setdefault(channel, deque()).append(message)
def receive_many(self, channels):
while True:
# Try to pop a message from each channel
for channel in channels:
try:
# This doesn't clean up empty channels - OK for testing.
# For later versions, have cleanup w/lock.
return channel, queues[channel].popleft()
except (IndexError, KeyError):
pass
# If all empty, sleep for a little bit
time.sleep(0.01)
import time
import json
from collections import deque
from .base import BaseChannelBackend
queues = {}
class InMemoryChannelBackend(BaseChannelBackend):
"""
In-memory channel implementation. Intended only for use with threading,
in low-throughput development environments.
"""
def send(self, channel, message):
# Try JSON encoding it to make sure it would, but store the native version
json.dumps(message)
# Add to the deque, making it if needs be
queues.setdefault(channel, deque()).append(message)
def receive_many(self, channels):
while True:
# Try to pop a message from each channel
for channel in channels:
try:
# This doesn't clean up empty channels - OK for testing.
# For later versions, have cleanup w/lock.
return channel, queues[channel].popleft()
except (IndexError, KeyError):
pass
# If all empty, sleep for a little bit
time.sleep(0.01)

View File

@ -1,68 +1,68 @@
import time
import datetime
from django.apps.registry import Apps
from django.db import models, connections, DEFAULT_DB_ALIAS
from .base import BaseChannelBackend
queues = {}
class ORMChannelBackend(BaseChannelBackend):
"""
ORM-backed channel environment. For development use only; it will span
multiple processes fine, but it's going to be pretty bad at throughput.
"""
def __init__(self, expiry, db_alias=DEFAULT_DB_ALIAS):
super(ORMChannelBackend, self).__init__(expiry)
self.connection = connections[db_alias]
self.model = self.make_model()
self.ensure_schema()
def make_model(self):
"""
Initialises a new model to store messages; not done as part of a
models.py as we don't want to make it for most installs.
"""
class Message(models.Model):
# We assume an autoincrementing PK for message order
channel = models.CharField(max_length=200, db_index=True)
content = models.TextField()
expiry = models.DateTimeField(db_index=True)
class Meta:
apps = Apps()
app_label = "channels"
db_table = "django_channels"
return Message
def ensure_schema(self):
"""
Ensures the table exists and has the correct schema.
"""
# If the table's there, that's fine - we've never changed its schema
# in the codebase.
if self.model._meta.db_table in self.connection.introspection.table_names(self.connection.cursor()):
return
# Make the table
with self.connection.schema_editor() as editor:
editor.create_model(self.model)
def send(self, channel, message):
self.model.objects.create(
channel = channel,
message = json.dumps(message),
expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.expiry)
)
def receive_many(self, channels):
while True:
# Delete all expired messages (add 10 second grace period for clock sync)
self.model.objects.filter(expiry__lt=datetime.datetime.utcnow() - datetime.timedelta(seconds=10)).delete()
# Get a message from one of our channels
message = self.model.objects.filter(channel__in=channels).order_by("id").first()
if message:
return message.channel, json.loads(message.content)
# If all empty, sleep for a little bit
time.sleep(0.2)
import time
import datetime
from django.apps.registry import Apps
from django.db import models, connections, DEFAULT_DB_ALIAS
from .base import BaseChannelBackend
queues = {}
class ORMChannelBackend(BaseChannelBackend):
"""
ORM-backed channel environment. For development use only; it will span
multiple processes fine, but it's going to be pretty bad at throughput.
"""
def __init__(self, expiry, db_alias=DEFAULT_DB_ALIAS):
super(ORMChannelBackend, self).__init__(expiry)
self.connection = connections[db_alias]
self.model = self.make_model()
self.ensure_schema()
def make_model(self):
"""
Initialises a new model to store messages; not done as part of a
models.py as we don't want to make it for most installs.
"""
class Message(models.Model):
# We assume an autoincrementing PK for message order
channel = models.CharField(max_length=200, db_index=True)
content = models.TextField()
expiry = models.DateTimeField(db_index=True)
class Meta:
apps = Apps()
app_label = "channels"
db_table = "django_channels"
return Message
def ensure_schema(self):
"""
Ensures the table exists and has the correct schema.
"""
# If the table's there, that's fine - we've never changed its schema
# in the codebase.
if self.model._meta.db_table in self.connection.introspection.table_names(self.connection.cursor()):
return
# Make the table
with self.connection.schema_editor() as editor:
editor.create_model(self.model)
def send(self, channel, message):
self.model.objects.create(
channel = channel,
message = json.dumps(message),
expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.expiry)
)
def receive_many(self, channels):
while True:
# Delete all expired messages (add 10 second grace period for clock sync)
self.model.objects.filter(expiry__lt=datetime.datetime.utcnow() - datetime.timedelta(seconds=10)).delete()
# Get a message from one of our channels
message = self.model.objects.filter(channel__in=channels).order_by("id").first()
if message:
return message.channel, json.loads(message.content)
# If all empty, sleep for a little bit
time.sleep(0.2)

View File

@ -1,40 +1,40 @@
import functools
from django.utils import six
from .utils import name_that_thing
class ConsumerRegistry(object):
"""
Manages the available consumers in the project and which channels they
listen to.
Generally this is attached to a backend instance as ".registry"
"""
def __init__(self):
self.consumers = {}
def add_consumer(self, consumer, channels):
# Upconvert if you just pass in a string
if isinstance(channels, six.string_types):
channels = [channels]
# Register on each channel, checking it's unique
for channel in channels:
if channel in self.consumers:
raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % (
name_that_thing(consumer),
channel,
name_that_thing(self.consumers[channel]),
))
self.consumers[channel] = consumer
def all_channel_names(self):
return self.consumers.keys()
def consumer_for_channel(self, channel):
try:
return self.consumers[channel]
except KeyError:
return None
import functools
from django.utils import six
from .utils import name_that_thing
class ConsumerRegistry(object):
"""
Manages the available consumers in the project and which channels they
listen to.
Generally this is attached to a backend instance as ".registry"
"""
def __init__(self):
self.consumers = {}
def add_consumer(self, consumer, channels):
# Upconvert if you just pass in a string
if isinstance(channels, six.string_types):
channels = [channels]
# Register on each channel, checking it's unique
for channel in channels:
if channel in self.consumers:
raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % (
name_that_thing(consumer),
channel,
name_that_thing(self.consumers[channel]),
))
self.consumers[channel] = consumer
def all_channel_names(self):
return self.consumers.keys()
def consumer_for_channel(self, channel):
try:
return self.consumers[channel]
except KeyError:
return None

View File

@ -1,27 +1,27 @@
from django.http.request import HttpRequest
from django.http.response import HttpResponseBase
from django.core.handlers.base import BaseHandler
from .request import encode_request, decode_request
from .response import encode_response, decode_response, ResponseLater
def monkeypatch_django():
"""
Monkeypatches support for us into parts of Django.
"""
# Request encode/decode
HttpRequest.channel_encode = encode_request
HttpRequest.channel_decode = staticmethod(decode_request)
# Response encode/decode
HttpResponseBase.channel_encode = encode_response
HttpResponseBase.channel_decode = staticmethod(decode_response)
HttpResponseBase.ResponseLater = ResponseLater
# Allow ResponseLater to propagate above handler
BaseHandler.old_handle_uncaught_exception = BaseHandler.handle_uncaught_exception
BaseHandler.handle_uncaught_exception = new_handle_uncaught_exception
def new_handle_uncaught_exception(self, request, resolver, exc_info):
if exc_info[0] is ResponseLater:
raise
return BaseHandler.old_handle_uncaught_exception(self, request, resolver, exc_info)
from django.http.request import HttpRequest
from django.http.response import HttpResponseBase
from django.core.handlers.base import BaseHandler
from .request import encode_request, decode_request
from .response import encode_response, decode_response, ResponseLater
def monkeypatch_django():
"""
Monkeypatches support for us into parts of Django.
"""
# Request encode/decode
HttpRequest.channel_encode = encode_request
HttpRequest.channel_decode = staticmethod(decode_request)
# Response encode/decode
HttpResponseBase.channel_encode = encode_response
HttpResponseBase.channel_decode = staticmethod(decode_response)
HttpResponseBase.ResponseLater = ResponseLater
# Allow ResponseLater to propagate above handler
BaseHandler.old_handle_uncaught_exception = BaseHandler.handle_uncaught_exception
BaseHandler.handle_uncaught_exception = new_handle_uncaught_exception
def new_handle_uncaught_exception(self, request, resolver, exc_info):
if exc_info[0] is ResponseLater:
raise
return BaseHandler.old_handle_uncaught_exception(self, request, resolver, exc_info)

View File

@ -1,60 +1,60 @@
import django
import threading
from django.core.management.commands.runserver import Command as RunserverCommand
from django.core.handlers.wsgi import WSGIHandler
from django.http import HttpResponse
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
from channels.worker import Worker
from channels.utils import auto_import_consumers
from channels.adapters import UrlConsumer
class Command(RunserverCommand):
def get_handler(self, *args, **options):
"""
Returns the default WSGI handler for the runner.
"""
django.setup()
return WSGIInterfaceHandler()
def run(self, *args, **options):
# Force disable reloader for now
options['use_reloader'] = False
# Check a handler is registered for http reqs
channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND]
auto_import_consumers()
if not channel_layer.registry.consumer_for_channel("django.wsgi.request"):
# Register the default one
channel_layer.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"])
# Launch a worker thread
worker = WorkerThread(channel_layer)
worker.daemon = True
worker.start()
# Run the rest
return super(Command, self).run(*args, **options)
class WSGIInterfaceHandler(WSGIHandler):
"""
New WSGI handler that pushes requests to channels.
"""
def get_response(self, request):
request.response_channel = Channel.new_name("django.wsgi.response")
Channel("django.wsgi.request").send(**request.channel_encode())
channel, message = channel_backends[DEFAULT_CHANNEL_BACKEND].receive_many([request.response_channel])
return HttpResponse.channel_decode(message)
class WorkerThread(threading.Thread):
"""
Class that runs a worker
"""
def __init__(self, channel_layer):
super(WorkerThread, self).__init__()
self.channel_layer = channel_layer
def run(self):
Worker(channel_layer=self.channel_layer).run()
import django
import threading
from django.core.management.commands.runserver import Command as RunserverCommand
from django.core.handlers.wsgi import WSGIHandler
from django.http import HttpResponse
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
from channels.worker import Worker
from channels.utils import auto_import_consumers
from channels.adapters import UrlConsumer
class Command(RunserverCommand):
def get_handler(self, *args, **options):
"""
Returns the default WSGI handler for the runner.
"""
django.setup()
return WSGIInterfaceHandler()
def run(self, *args, **options):
# Force disable reloader for now
options['use_reloader'] = False
# Check a handler is registered for http reqs
channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND]
auto_import_consumers()
if not channel_layer.registry.consumer_for_channel("django.wsgi.request"):
# Register the default one
channel_layer.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"])
# Launch a worker thread
worker = WorkerThread(channel_layer)
worker.daemon = True
worker.start()
# Run the rest
return super(Command, self).run(*args, **options)
class WSGIInterfaceHandler(WSGIHandler):
"""
New WSGI handler that pushes requests to channels.
"""
def get_response(self, request):
request.response_channel = Channel.new_name("django.wsgi.response")
Channel("django.wsgi.request").send(**request.channel_encode())
channel, message = channel_backends[DEFAULT_CHANNEL_BACKEND].receive_many([request.response_channel])
return HttpResponse.channel_decode(message)
class WorkerThread(threading.Thread):
"""
Class that runs a worker
"""
def __init__(self, channel_layer):
super(WorkerThread, self).__init__()
self.channel_layer = channel_layer
def run(self):
Worker(channel_layer=self.channel_layer).run()

View File

@ -1,36 +1,36 @@
from django.http import HttpRequest
from django.utils.datastructures import MultiValueDict
def encode_request(request):
"""
Encodes a request to JSON-compatible datastructures
"""
# TODO: More stuff
value = {
"GET": request.GET.items(),
"POST": request.POST.items(),
"COOKIES": request.COOKIES,
"META": {k: v for k, v in request.META.items() if not k.startswith("wsgi")},
"path": request.path,
"path_info": request.path_info,
"method": request.method,
"response_channel": request.response_channel,
}
return value
def decode_request(value):
"""
Decodes a request JSONish value to a HttpRequest object.
"""
request = HttpRequest()
request.GET = MultiValueDict(value['GET'])
request.POST = MultiValueDict(value['POST'])
request.COOKIES = value['COOKIES']
request.META = value['META']
request.path = value['path']
request.method = value['method']
request.path_info = value['path_info']
request.response_channel = value['response_channel']
return request
from django.http import HttpRequest
from django.utils.datastructures import MultiValueDict
def encode_request(request):
"""
Encodes a request to JSON-compatible datastructures
"""
# TODO: More stuff
value = {
"GET": request.GET.items(),
"POST": request.POST.items(),
"COOKIES": request.COOKIES,
"META": {k: v for k, v in request.META.items() if not k.startswith("wsgi")},
"path": request.path,
"path_info": request.path_info,
"method": request.method,
"response_channel": request.response_channel,
}
return value
def decode_request(value):
"""
Decodes a request JSONish value to a HttpRequest object.
"""
request = HttpRequest()
request.GET = MultiValueDict(value['GET'])
request.POST = MultiValueDict(value['POST'])
request.COOKIES = value['COOKIES']
request.META = value['META']
request.path = value['path']
request.method = value['method']
request.path_info = value['path_info']
request.response_channel = value['response_channel']
return request

View File

@ -1,38 +1,38 @@
from django.http import HttpResponse
def encode_response(response):
"""
Encodes a response to JSON-compatible datastructures
"""
# TODO: Entirely useful things like cookies
value = {
"content_type": getattr(response, "content_type", None),
"content": response.content,
"status_code": response.status_code,
"headers": response._headers.values(),
}
response.close()
return value
def decode_response(value):
"""
Decodes a response JSONish value to a HttpResponse object.
"""
response = HttpResponse(
content = value['content'],
content_type = value['content_type'],
status = value['status_code'],
)
response._headers = {k.lower: (k, v) for k, v in value['headers']}
return response
class ResponseLater(Exception):
"""
Class that represents a response which will be sent doown the response
channel later. Used to move a django view-based segment onto the next
task, as otherwise we'd need to write some kind of fake response.
"""
pass
from django.http import HttpResponse
def encode_response(response):
"""
Encodes a response to JSON-compatible datastructures
"""
# TODO: Entirely useful things like cookies
value = {
"content_type": getattr(response, "content_type", None),
"content": response.content,
"status_code": response.status_code,
"headers": response._headers.values(),
}
response.close()
return value
def decode_response(value):
"""
Decodes a response JSONish value to a HttpResponse object.
"""
response = HttpResponse(
content = value['content'],
content_type = value['content_type'],
status = value['status_code'],
)
response._headers = {k.lower: (k, v) for k, v in value['headers']}
return response
class ResponseLater(Exception):
"""
Class that represents a response which will be sent doown the response
channel later. Used to move a django view-based segment onto the next
task, as otherwise we'd need to write some kind of fake response.
"""
pass

View File

@ -1,29 +1,29 @@
import types
from django.apps import apps
def auto_import_consumers():
"""
Auto-import consumers modules in apps
"""
for app_config in apps.get_app_configs():
for submodule in ["consumers", "views"]:
module_name = "%s.%s" % (app_config.name, submodule)
try:
__import__(module_name)
except ImportError as e:
if "no module named %s" % submodule not in str(e).lower():
raise
def name_that_thing(thing):
"""
Returns either the function/class path or just the object's repr
"""
if hasattr(thing, "__name__"):
if hasattr(thing, "__class__") and not isinstance(thing, types.FunctionType):
if thing.__class__ is not type:
return name_that_thing(thing.__class__)
if hasattr(thing, "__module__"):
return "%s.%s" % (thing.__module__, thing.__name__)
return repr(thing)
import types
from django.apps import apps
def auto_import_consumers():
"""
Auto-import consumers modules in apps
"""
for app_config in apps.get_app_configs():
for submodule in ["consumers", "views"]:
module_name = "%s.%s" % (app_config.name, submodule)
try:
__import__(module_name)
except ImportError as e:
if "no module named %s" % submodule not in str(e).lower():
raise
def name_that_thing(thing):
"""
Returns either the function/class path or just the object's repr
"""
if hasattr(thing, "__name__"):
if hasattr(thing, "__class__") and not isinstance(thing, types.FunctionType):
if thing.__class__ is not type:
return name_that_thing(thing.__class__)
if hasattr(thing, "__module__"):
return "%s.%s" % (thing.__module__, thing.__name__)
return repr(thing)

View File

@ -1,19 +1,19 @@
class Worker(object):
"""
A "worker" process that continually looks for available messages to run
and runs their consumers.
"""
def __init__(self, channel_layer):
self.channel_layer = channel_layer
def run(self):
"""
Tries to continually dispatch messages to consumers.
"""
channels = self.channel_layer.registry.all_channel_names()
while True:
channel, message = self.channel_layer.receive_many(channels)
consumer = self.channel_layer.registry.consumer_for_channel(channel)
consumer(channel=channel, **message)
class Worker(object):
"""
A "worker" process that continually looks for available messages to run
and runs their consumers.
"""
def __init__(self, channel_layer):
self.channel_layer = channel_layer
def run(self):
"""
Tries to continually dispatch messages to consumers.
"""
channels = self.channel_layer.registry.all_channel_names()
while True:
channel, message = self.channel_layer.receive_many(channels)
consumer = self.channel_layer.registry.consumer_for_channel(channel)
consumer(channel=channel, **message)