diff --git a/channels/__init__.py b/channels/__init__.py index 6203a7b..1010c41 100644 --- a/channels/__init__.py +++ b/channels/__init__.py @@ -1,18 +1,18 @@ -# Load backends -DEFAULT_CHANNEL_BACKEND = "default" -from .backends import BackendManager -from django.conf import settings -channel_backends = BackendManager( - getattr(settings, "CHANNEL_BACKENDS", { - DEFAULT_CHANNEL_BACKEND: { - "BACKEND": "channels.backends.memory.InMemoryChannelBackend", - } - }) -) - -# Ensure monkeypatching -from .hacks import monkeypatch_django -monkeypatch_django() - -# Promote channel to top-level (down here to avoid circular import errs) -from .channel import Channel +# Load backends +DEFAULT_CHANNEL_BACKEND = "default" +from .backends import BackendManager +from django.conf import settings +channel_backends = BackendManager( + getattr(settings, "CHANNEL_BACKENDS", { + DEFAULT_CHANNEL_BACKEND: { + "BACKEND": "channels.backends.memory.InMemoryChannelBackend", + } + }) +) + +# Ensure monkeypatching +from .hacks import monkeypatch_django +monkeypatch_django() + +# Promote channel to top-level (down here to avoid circular import errs) +from .channel import Channel diff --git a/channels/adapters.py b/channels/adapters.py index 6e31647..0879241 100644 --- a/channels/adapters.py +++ b/channels/adapters.py @@ -1,53 +1,53 @@ -import functools - -from django.core.handlers.base import BaseHandler -from django.http import HttpRequest, HttpResponse - -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND - - -class UrlConsumer(object): - """ - Dispatches channel HTTP requests into django's URL system. - """ - - def __init__(self): - self.handler = BaseHandler() - self.handler.load_middleware() - - def __call__(self, channel, **kwargs): - request = HttpRequest.channel_decode(kwargs) - try: - response = self.handler.get_response(request) - except HttpResponse.ResponseLater: - return - Channel(request.response_channel).send(**response.channel_encode()) - - -def view_producer(channel_name): - """ - Returns a new view function that actually writes the request to a channel - and abandons the response (with an exception the Worker will catch) - """ - def producing_view(request): - Channel(channel_name).send(**request.channel_encode()) - raise HttpResponse.ResponseLater() - return producing_view - - -def view_consumer(channel_name, alias=DEFAULT_CHANNEL_BACKEND): - """ - Decorates a normal Django view to be a channel consumer. - Does not run any middleware - """ - def inner(func): - @functools.wraps(func) - def consumer(channel, **kwargs): - request = HttpRequest.channel_decode(kwargs) - response = func(request) - Channel(request.response_channel).send(**response.channel_encode()) - # Get the channel layer and register - channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND] - channel_layer.registry.add_consumer(consumer, [channel_name]) - return func - return inner +import functools + +from django.core.handlers.base import BaseHandler +from django.http import HttpRequest, HttpResponse + +from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND + + +class UrlConsumer(object): + """ + Dispatches channel HTTP requests into django's URL system. + """ + + def __init__(self): + self.handler = BaseHandler() + self.handler.load_middleware() + + def __call__(self, channel, **kwargs): + request = HttpRequest.channel_decode(kwargs) + try: + response = self.handler.get_response(request) + except HttpResponse.ResponseLater: + return + Channel(request.response_channel).send(**response.channel_encode()) + + +def view_producer(channel_name): + """ + Returns a new view function that actually writes the request to a channel + and abandons the response (with an exception the Worker will catch) + """ + def producing_view(request): + Channel(channel_name).send(**request.channel_encode()) + raise HttpResponse.ResponseLater() + return producing_view + + +def view_consumer(channel_name, alias=DEFAULT_CHANNEL_BACKEND): + """ + Decorates a normal Django view to be a channel consumer. + Does not run any middleware + """ + def inner(func): + @functools.wraps(func) + def consumer(channel, **kwargs): + request = HttpRequest.channel_decode(kwargs) + response = func(request) + Channel(request.response_channel).send(**response.channel_encode()) + # Get the channel layer and register + channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND] + channel_layer.registry.add_consumer(consumer, [channel_name]) + return func + return inner diff --git a/channels/backends/base.py b/channels/backends/base.py index 62866f0..a3481cb 100644 --- a/channels/backends/base.py +++ b/channels/backends/base.py @@ -1,33 +1,33 @@ -from channels.consumer_registry import ConsumerRegistry - - -class ChannelClosed(Exception): - """ - Raised when you try to send to a closed channel. - """ - pass - - -class BaseChannelBackend(object): - """ - Base class for all channel layer implementations. Manages both sending - and receving messages from the backend, and each comes with its own - registry of consumers. - """ - - def __init__(self, expiry=60): - self.registry = ConsumerRegistry() - self.expiry = expiry - - def send(self, channel, message): - """ - Send a message over the channel, taken from the kwargs. - """ - raise NotImplementedError() - - def receive_many(self, channels): - """ - Block and return the first message available on one of the - channels passed, as a (channel, message) tuple. - """ - raise NotImplementedError() +from channels.consumer_registry import ConsumerRegistry + + +class ChannelClosed(Exception): + """ + Raised when you try to send to a closed channel. + """ + pass + + +class BaseChannelBackend(object): + """ + Base class for all channel layer implementations. Manages both sending + and receving messages from the backend, and each comes with its own + registry of consumers. + """ + + def __init__(self, expiry=60): + self.registry = ConsumerRegistry() + self.expiry = expiry + + def send(self, channel, message): + """ + Send a message over the channel, taken from the kwargs. + """ + raise NotImplementedError() + + def receive_many(self, channels): + """ + Block and return the first message available on one of the + channels passed, as a (channel, message) tuple. + """ + raise NotImplementedError() diff --git a/channels/backends/memory.py b/channels/backends/memory.py index 770e602..123ec40 100644 --- a/channels/backends/memory.py +++ b/channels/backends/memory.py @@ -1,32 +1,32 @@ -import time -import json -from collections import deque -from .base import BaseChannelBackend - -queues = {} - -class InMemoryChannelBackend(BaseChannelBackend): - """ - In-memory channel implementation. Intended only for use with threading, - in low-throughput development environments. - """ - - def send(self, channel, message): - # Try JSON encoding it to make sure it would, but store the native version - json.dumps(message) - # Add to the deque, making it if needs be - queues.setdefault(channel, deque()).append(message) - - def receive_many(self, channels): - while True: - # Try to pop a message from each channel - for channel in channels: - try: - # This doesn't clean up empty channels - OK for testing. - # For later versions, have cleanup w/lock. - return channel, queues[channel].popleft() - except (IndexError, KeyError): - pass - # If all empty, sleep for a little bit - time.sleep(0.01) - +import time +import json +from collections import deque +from .base import BaseChannelBackend + +queues = {} + +class InMemoryChannelBackend(BaseChannelBackend): + """ + In-memory channel implementation. Intended only for use with threading, + in low-throughput development environments. + """ + + def send(self, channel, message): + # Try JSON encoding it to make sure it would, but store the native version + json.dumps(message) + # Add to the deque, making it if needs be + queues.setdefault(channel, deque()).append(message) + + def receive_many(self, channels): + while True: + # Try to pop a message from each channel + for channel in channels: + try: + # This doesn't clean up empty channels - OK for testing. + # For later versions, have cleanup w/lock. + return channel, queues[channel].popleft() + except (IndexError, KeyError): + pass + # If all empty, sleep for a little bit + time.sleep(0.01) + diff --git a/channels/backends/orm.py b/channels/backends/orm.py index 225a9f7..fdf6645 100644 --- a/channels/backends/orm.py +++ b/channels/backends/orm.py @@ -1,68 +1,68 @@ -import time -import datetime - -from django.apps.registry import Apps -from django.db import models, connections, DEFAULT_DB_ALIAS - -from .base import BaseChannelBackend - -queues = {} - -class ORMChannelBackend(BaseChannelBackend): - """ - ORM-backed channel environment. For development use only; it will span - multiple processes fine, but it's going to be pretty bad at throughput. - """ - - def __init__(self, expiry, db_alias=DEFAULT_DB_ALIAS): - super(ORMChannelBackend, self).__init__(expiry) - self.connection = connections[db_alias] - self.model = self.make_model() - self.ensure_schema() - - def make_model(self): - """ - Initialises a new model to store messages; not done as part of a - models.py as we don't want to make it for most installs. - """ - class Message(models.Model): - # We assume an autoincrementing PK for message order - channel = models.CharField(max_length=200, db_index=True) - content = models.TextField() - expiry = models.DateTimeField(db_index=True) - class Meta: - apps = Apps() - app_label = "channels" - db_table = "django_channels" - return Message - - def ensure_schema(self): - """ - Ensures the table exists and has the correct schema. - """ - # If the table's there, that's fine - we've never changed its schema - # in the codebase. - if self.model._meta.db_table in self.connection.introspection.table_names(self.connection.cursor()): - return - # Make the table - with self.connection.schema_editor() as editor: - editor.create_model(self.model) - - def send(self, channel, message): - self.model.objects.create( - channel = channel, - message = json.dumps(message), - expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.expiry) - ) - - def receive_many(self, channels): - while True: - # Delete all expired messages (add 10 second grace period for clock sync) - self.model.objects.filter(expiry__lt=datetime.datetime.utcnow() - datetime.timedelta(seconds=10)).delete() - # Get a message from one of our channels - message = self.model.objects.filter(channel__in=channels).order_by("id").first() - if message: - return message.channel, json.loads(message.content) - # If all empty, sleep for a little bit - time.sleep(0.2) - +import time +import datetime + +from django.apps.registry import Apps +from django.db import models, connections, DEFAULT_DB_ALIAS + +from .base import BaseChannelBackend + +queues = {} + +class ORMChannelBackend(BaseChannelBackend): + """ + ORM-backed channel environment. For development use only; it will span + multiple processes fine, but it's going to be pretty bad at throughput. + """ + + def __init__(self, expiry, db_alias=DEFAULT_DB_ALIAS): + super(ORMChannelBackend, self).__init__(expiry) + self.connection = connections[db_alias] + self.model = self.make_model() + self.ensure_schema() + + def make_model(self): + """ + Initialises a new model to store messages; not done as part of a + models.py as we don't want to make it for most installs. + """ + class Message(models.Model): + # We assume an autoincrementing PK for message order + channel = models.CharField(max_length=200, db_index=True) + content = models.TextField() + expiry = models.DateTimeField(db_index=True) + class Meta: + apps = Apps() + app_label = "channels" + db_table = "django_channels" + return Message + + def ensure_schema(self): + """ + Ensures the table exists and has the correct schema. + """ + # If the table's there, that's fine - we've never changed its schema + # in the codebase. + if self.model._meta.db_table in self.connection.introspection.table_names(self.connection.cursor()): + return + # Make the table + with self.connection.schema_editor() as editor: + editor.create_model(self.model) + + def send(self, channel, message): + self.model.objects.create( + channel = channel, + message = json.dumps(message), + expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.expiry) + ) + + def receive_many(self, channels): + while True: + # Delete all expired messages (add 10 second grace period for clock sync) + self.model.objects.filter(expiry__lt=datetime.datetime.utcnow() - datetime.timedelta(seconds=10)).delete() + # Get a message from one of our channels + message = self.model.objects.filter(channel__in=channels).order_by("id").first() + if message: + return message.channel, json.loads(message.content) + # If all empty, sleep for a little bit + time.sleep(0.2) + diff --git a/channels/consumer_registry.py b/channels/consumer_registry.py index 1bfe4e0..5e80729 100644 --- a/channels/consumer_registry.py +++ b/channels/consumer_registry.py @@ -1,40 +1,40 @@ -import functools - -from django.utils import six - -from .utils import name_that_thing - - -class ConsumerRegistry(object): - """ - Manages the available consumers in the project and which channels they - listen to. - - Generally this is attached to a backend instance as ".registry" - """ - - def __init__(self): - self.consumers = {} - - def add_consumer(self, consumer, channels): - # Upconvert if you just pass in a string - if isinstance(channels, six.string_types): - channels = [channels] - # Register on each channel, checking it's unique - for channel in channels: - if channel in self.consumers: - raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % ( - name_that_thing(consumer), - channel, - name_that_thing(self.consumers[channel]), - )) - self.consumers[channel] = consumer - - def all_channel_names(self): - return self.consumers.keys() - - def consumer_for_channel(self, channel): - try: - return self.consumers[channel] - except KeyError: - return None +import functools + +from django.utils import six + +from .utils import name_that_thing + + +class ConsumerRegistry(object): + """ + Manages the available consumers in the project and which channels they + listen to. + + Generally this is attached to a backend instance as ".registry" + """ + + def __init__(self): + self.consumers = {} + + def add_consumer(self, consumer, channels): + # Upconvert if you just pass in a string + if isinstance(channels, six.string_types): + channels = [channels] + # Register on each channel, checking it's unique + for channel in channels: + if channel in self.consumers: + raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % ( + name_that_thing(consumer), + channel, + name_that_thing(self.consumers[channel]), + )) + self.consumers[channel] = consumer + + def all_channel_names(self): + return self.consumers.keys() + + def consumer_for_channel(self, channel): + try: + return self.consumers[channel] + except KeyError: + return None diff --git a/channels/hacks.py b/channels/hacks.py index 5d5e329..e3003cf 100644 --- a/channels/hacks.py +++ b/channels/hacks.py @@ -1,27 +1,27 @@ -from django.http.request import HttpRequest -from django.http.response import HttpResponseBase -from django.core.handlers.base import BaseHandler -from .request import encode_request, decode_request -from .response import encode_response, decode_response, ResponseLater - - -def monkeypatch_django(): - """ - Monkeypatches support for us into parts of Django. - """ - # Request encode/decode - HttpRequest.channel_encode = encode_request - HttpRequest.channel_decode = staticmethod(decode_request) - # Response encode/decode - HttpResponseBase.channel_encode = encode_response - HttpResponseBase.channel_decode = staticmethod(decode_response) - HttpResponseBase.ResponseLater = ResponseLater - # Allow ResponseLater to propagate above handler - BaseHandler.old_handle_uncaught_exception = BaseHandler.handle_uncaught_exception - BaseHandler.handle_uncaught_exception = new_handle_uncaught_exception - - -def new_handle_uncaught_exception(self, request, resolver, exc_info): - if exc_info[0] is ResponseLater: - raise - return BaseHandler.old_handle_uncaught_exception(self, request, resolver, exc_info) +from django.http.request import HttpRequest +from django.http.response import HttpResponseBase +from django.core.handlers.base import BaseHandler +from .request import encode_request, decode_request +from .response import encode_response, decode_response, ResponseLater + + +def monkeypatch_django(): + """ + Monkeypatches support for us into parts of Django. + """ + # Request encode/decode + HttpRequest.channel_encode = encode_request + HttpRequest.channel_decode = staticmethod(decode_request) + # Response encode/decode + HttpResponseBase.channel_encode = encode_response + HttpResponseBase.channel_decode = staticmethod(decode_response) + HttpResponseBase.ResponseLater = ResponseLater + # Allow ResponseLater to propagate above handler + BaseHandler.old_handle_uncaught_exception = BaseHandler.handle_uncaught_exception + BaseHandler.handle_uncaught_exception = new_handle_uncaught_exception + + +def new_handle_uncaught_exception(self, request, resolver, exc_info): + if exc_info[0] is ResponseLater: + raise + return BaseHandler.old_handle_uncaught_exception(self, request, resolver, exc_info) diff --git a/channels/management/commands/runserver.py b/channels/management/commands/runserver.py index 7858359..ade88a2 100644 --- a/channels/management/commands/runserver.py +++ b/channels/management/commands/runserver.py @@ -1,60 +1,60 @@ -import django -import threading -from django.core.management.commands.runserver import Command as RunserverCommand -from django.core.handlers.wsgi import WSGIHandler -from django.http import HttpResponse -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND -from channels.worker import Worker -from channels.utils import auto_import_consumers -from channels.adapters import UrlConsumer - - -class Command(RunserverCommand): - - def get_handler(self, *args, **options): - """ - Returns the default WSGI handler for the runner. - """ - django.setup() - return WSGIInterfaceHandler() - - def run(self, *args, **options): - # Force disable reloader for now - options['use_reloader'] = False - # Check a handler is registered for http reqs - channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND] - auto_import_consumers() - if not channel_layer.registry.consumer_for_channel("django.wsgi.request"): - # Register the default one - channel_layer.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) - # Launch a worker thread - worker = WorkerThread(channel_layer) - worker.daemon = True - worker.start() - # Run the rest - return super(Command, self).run(*args, **options) - - -class WSGIInterfaceHandler(WSGIHandler): - """ - New WSGI handler that pushes requests to channels. - """ - - def get_response(self, request): - request.response_channel = Channel.new_name("django.wsgi.response") - Channel("django.wsgi.request").send(**request.channel_encode()) - channel, message = channel_backends[DEFAULT_CHANNEL_BACKEND].receive_many([request.response_channel]) - return HttpResponse.channel_decode(message) - - -class WorkerThread(threading.Thread): - """ - Class that runs a worker - """ - - def __init__(self, channel_layer): - super(WorkerThread, self).__init__() - self.channel_layer = channel_layer - - def run(self): - Worker(channel_layer=self.channel_layer).run() +import django +import threading +from django.core.management.commands.runserver import Command as RunserverCommand +from django.core.handlers.wsgi import WSGIHandler +from django.http import HttpResponse +from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND +from channels.worker import Worker +from channels.utils import auto_import_consumers +from channels.adapters import UrlConsumer + + +class Command(RunserverCommand): + + def get_handler(self, *args, **options): + """ + Returns the default WSGI handler for the runner. + """ + django.setup() + return WSGIInterfaceHandler() + + def run(self, *args, **options): + # Force disable reloader for now + options['use_reloader'] = False + # Check a handler is registered for http reqs + channel_layer = channel_backends[DEFAULT_CHANNEL_BACKEND] + auto_import_consumers() + if not channel_layer.registry.consumer_for_channel("django.wsgi.request"): + # Register the default one + channel_layer.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) + # Launch a worker thread + worker = WorkerThread(channel_layer) + worker.daemon = True + worker.start() + # Run the rest + return super(Command, self).run(*args, **options) + + +class WSGIInterfaceHandler(WSGIHandler): + """ + New WSGI handler that pushes requests to channels. + """ + + def get_response(self, request): + request.response_channel = Channel.new_name("django.wsgi.response") + Channel("django.wsgi.request").send(**request.channel_encode()) + channel, message = channel_backends[DEFAULT_CHANNEL_BACKEND].receive_many([request.response_channel]) + return HttpResponse.channel_decode(message) + + +class WorkerThread(threading.Thread): + """ + Class that runs a worker + """ + + def __init__(self, channel_layer): + super(WorkerThread, self).__init__() + self.channel_layer = channel_layer + + def run(self): + Worker(channel_layer=self.channel_layer).run() diff --git a/channels/request.py b/channels/request.py index 7062d62..d7cd4b2 100644 --- a/channels/request.py +++ b/channels/request.py @@ -1,36 +1,36 @@ -from django.http import HttpRequest -from django.utils.datastructures import MultiValueDict - - -def encode_request(request): - """ - Encodes a request to JSON-compatible datastructures - """ - # TODO: More stuff - value = { - "GET": request.GET.items(), - "POST": request.POST.items(), - "COOKIES": request.COOKIES, - "META": {k: v for k, v in request.META.items() if not k.startswith("wsgi")}, - "path": request.path, - "path_info": request.path_info, - "method": request.method, - "response_channel": request.response_channel, - } - return value - - -def decode_request(value): - """ - Decodes a request JSONish value to a HttpRequest object. - """ - request = HttpRequest() - request.GET = MultiValueDict(value['GET']) - request.POST = MultiValueDict(value['POST']) - request.COOKIES = value['COOKIES'] - request.META = value['META'] - request.path = value['path'] - request.method = value['method'] - request.path_info = value['path_info'] - request.response_channel = value['response_channel'] - return request +from django.http import HttpRequest +from django.utils.datastructures import MultiValueDict + + +def encode_request(request): + """ + Encodes a request to JSON-compatible datastructures + """ + # TODO: More stuff + value = { + "GET": request.GET.items(), + "POST": request.POST.items(), + "COOKIES": request.COOKIES, + "META": {k: v for k, v in request.META.items() if not k.startswith("wsgi")}, + "path": request.path, + "path_info": request.path_info, + "method": request.method, + "response_channel": request.response_channel, + } + return value + + +def decode_request(value): + """ + Decodes a request JSONish value to a HttpRequest object. + """ + request = HttpRequest() + request.GET = MultiValueDict(value['GET']) + request.POST = MultiValueDict(value['POST']) + request.COOKIES = value['COOKIES'] + request.META = value['META'] + request.path = value['path'] + request.method = value['method'] + request.path_info = value['path_info'] + request.response_channel = value['response_channel'] + return request diff --git a/channels/response.py b/channels/response.py index c71d333..e790250 100644 --- a/channels/response.py +++ b/channels/response.py @@ -1,38 +1,38 @@ -from django.http import HttpResponse - - -def encode_response(response): - """ - Encodes a response to JSON-compatible datastructures - """ - # TODO: Entirely useful things like cookies - value = { - "content_type": getattr(response, "content_type", None), - "content": response.content, - "status_code": response.status_code, - "headers": response._headers.values(), - } - response.close() - return value - - -def decode_response(value): - """ - Decodes a response JSONish value to a HttpResponse object. - """ - response = HttpResponse( - content = value['content'], - content_type = value['content_type'], - status = value['status_code'], - ) - response._headers = {k.lower: (k, v) for k, v in value['headers']} - return response - - -class ResponseLater(Exception): - """ - Class that represents a response which will be sent doown the response - channel later. Used to move a django view-based segment onto the next - task, as otherwise we'd need to write some kind of fake response. - """ - pass +from django.http import HttpResponse + + +def encode_response(response): + """ + Encodes a response to JSON-compatible datastructures + """ + # TODO: Entirely useful things like cookies + value = { + "content_type": getattr(response, "content_type", None), + "content": response.content, + "status_code": response.status_code, + "headers": response._headers.values(), + } + response.close() + return value + + +def decode_response(value): + """ + Decodes a response JSONish value to a HttpResponse object. + """ + response = HttpResponse( + content = value['content'], + content_type = value['content_type'], + status = value['status_code'], + ) + response._headers = {k.lower: (k, v) for k, v in value['headers']} + return response + + +class ResponseLater(Exception): + """ + Class that represents a response which will be sent doown the response + channel later. Used to move a django view-based segment onto the next + task, as otherwise we'd need to write some kind of fake response. + """ + pass diff --git a/channels/utils.py b/channels/utils.py index b110e7a..a44fe9c 100644 --- a/channels/utils.py +++ b/channels/utils.py @@ -1,29 +1,29 @@ -import types -from django.apps import apps - - -def auto_import_consumers(): - """ - Auto-import consumers modules in apps - """ - for app_config in apps.get_app_configs(): - for submodule in ["consumers", "views"]: - module_name = "%s.%s" % (app_config.name, submodule) - try: - __import__(module_name) - except ImportError as e: - if "no module named %s" % submodule not in str(e).lower(): - raise - - -def name_that_thing(thing): - """ - Returns either the function/class path or just the object's repr - """ - if hasattr(thing, "__name__"): - if hasattr(thing, "__class__") and not isinstance(thing, types.FunctionType): - if thing.__class__ is not type: - return name_that_thing(thing.__class__) - if hasattr(thing, "__module__"): - return "%s.%s" % (thing.__module__, thing.__name__) - return repr(thing) +import types +from django.apps import apps + + +def auto_import_consumers(): + """ + Auto-import consumers modules in apps + """ + for app_config in apps.get_app_configs(): + for submodule in ["consumers", "views"]: + module_name = "%s.%s" % (app_config.name, submodule) + try: + __import__(module_name) + except ImportError as e: + if "no module named %s" % submodule not in str(e).lower(): + raise + + +def name_that_thing(thing): + """ + Returns either the function/class path or just the object's repr + """ + if hasattr(thing, "__name__"): + if hasattr(thing, "__class__") and not isinstance(thing, types.FunctionType): + if thing.__class__ is not type: + return name_that_thing(thing.__class__) + if hasattr(thing, "__module__"): + return "%s.%s" % (thing.__module__, thing.__name__) + return repr(thing) diff --git a/channels/worker.py b/channels/worker.py index 47fa21a..bb0c669 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -1,19 +1,19 @@ -class Worker(object): - """ - A "worker" process that continually looks for available messages to run - and runs their consumers. - """ - - def __init__(self, channel_layer): - self.channel_layer = channel_layer - - def run(self): - """ - Tries to continually dispatch messages to consumers. - """ - - channels = self.channel_layer.registry.all_channel_names() - while True: - channel, message = self.channel_layer.receive_many(channels) - consumer = self.channel_layer.registry.consumer_for_channel(channel) - consumer(channel=channel, **message) +class Worker(object): + """ + A "worker" process that continually looks for available messages to run + and runs their consumers. + """ + + def __init__(self, channel_layer): + self.channel_layer = channel_layer + + def run(self): + """ + Tries to continually dispatch messages to consumers. + """ + + channels = self.channel_layer.registry.all_channel_names() + while True: + channel, message = self.channel_layer.receive_many(channels) + consumer = self.channel_layer.registry.consumer_for_channel(channel) + consumer(channel=channel, **message)