mirror of
https://github.com/django/daphne.git
synced 2025-07-09 23:52:22 +03:00
Make database channel layer use transactions to stop dupes
This commit is contained in:
parent
9d8d36007b
commit
5ff77719be
|
@ -6,7 +6,8 @@ import string
|
|||
import time
|
||||
|
||||
from django.apps.registry import Apps
|
||||
from django.db import DEFAULT_DB_ALIAS, connections, models
|
||||
from django.db import DEFAULT_DB_ALIAS, connections, models, transaction
|
||||
from django.db.utils import OperationalError
|
||||
from django.utils import six
|
||||
from django.utils.functional import cached_property
|
||||
from django.utils.timezone import now
|
||||
|
@ -56,15 +57,20 @@ class DatabaseChannelLayer(object):
|
|||
self._clean_expired()
|
||||
# Get a message from one of our channels
|
||||
while True:
|
||||
message = self.channel_model.objects.filter(channel__in=channels).order_by("id").first()
|
||||
if message:
|
||||
self.channel_model.objects.filter(pk=message.pk).delete()
|
||||
return message.channel, self.deserialize(message.content)
|
||||
try:
|
||||
with transaction.atomic():
|
||||
message = self.channel_model.objects.filter(channel__in=channels).order_by("id").first()
|
||||
if message:
|
||||
self.channel_model.objects.filter(pk=message.pk).delete()
|
||||
return message.channel, self.deserialize(message.content)
|
||||
except OperationalError:
|
||||
# The database is probably trying to prevent a deadlock
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
if block:
|
||||
time.sleep(1)
|
||||
else:
|
||||
if block:
|
||||
time.sleep(1)
|
||||
else:
|
||||
return None, None
|
||||
return None, None
|
||||
|
||||
def new_channel(self, pattern):
|
||||
assert isinstance(pattern, six.text_type)
|
||||
|
|
Loading…
Reference in New Issue
Block a user