diff --git a/consumer.py b/consumer.py index 124436a..556f71f 100644 --- a/consumer.py +++ b/consumer.py @@ -28,17 +28,28 @@ log = structlog.get_logger() print("loading values from DB, please wait") values = {} -for sign in Record.objects.last().signals.all(): - values[sign.signal.name] = [] +for sign in ExgausterSignal.objects.all(): + if not sign.config: + values[sign.name] = [] em = len(Record.objects.all()) if em > 60: for record in Record.objects.all()[em - 60 :]: for sign in record.signals.all(): - if sign.signal.name in values: + if not sign.signal.config and sign.signal.name in values: values[sign.signal.name].append(sign.value) +statuses = {} +keys_not_to_send = [] + +for signal in ExgausterSignal.objects.filter(config__isnull=True): + if signal.characteristics_description: + statuses[signal.name] = {} + for installation in signal.installations.all(): + keys_not_to_send.append(installation.name) + statuses[signal.name][installation.item_name] = installation.name + conf = { "bootstrap.servers": "rc1a-2ar1hqnl386tvq7k.mdb.yandexcloud.net:9091", "group.id": "MentalMind", @@ -55,7 +66,6 @@ conf = { def my_assign(consumer, partitions): for p in partitions: p.offset = Record.objects.last().offset + 1 - print("assign", partitions) consumer.assign(partitions) @@ -77,11 +87,13 @@ c.subscribe(["zsmk-9433-dev-01"], on_assign=my_assign) signals = {} for ex in ExgausterSignal.objects.all(): - if ex.place_x not in signals: - signals[ex.place_x] = {} - if ex.place_y not in signals[ex.place_x]: - signals[ex.place_x][ex.place_y] = {} - signals[ex.place_x][ex.place_y][ex.type] = ex + if not ex.config: + if ex.place_x not in signals: + signals[ex.place_x] = {} + if ex.place_y not in signals[ex.place_x]: + signals[ex.place_x][ex.place_y] = {} + signals[ex.place_x][ex.place_y][ex.type] = ex + # delete latest offset offset = Record.objects.last().offset @@ -113,9 +125,7 @@ while True: offset = msg.offset() start = time.time() print(f"DEBUG: received {offset}, {date}") - loop = asyncio.get_event_loop() - coroutine = send_to_channel_layer(data) - loop.run_until_complete(coroutine) + resp = {} record = Record.objects.create(timestamp=date, offset=offset, message=data) if offset != 0: approximation_amounts = [ @@ -133,7 +143,9 @@ while True: approximation_values[approximation_val] = {} for key, val in data.items(): - if "SM" in key: + if "SM" in key and key not in keys_not_to_send: + resp[key] = val + if key in values: del values[key][0] else: @@ -151,15 +163,30 @@ while True: ExgausterRecordSignal.objects.create( record=record, signal=signal, value=val ) + if key in statuses: + alarm_max = data[statuses[key]["alarm_max"]] + alarm_min = data[statuses[key]["alarm_min"]] + warning_max = data[statuses[key]["warning_max"]] + warning_min = data[statuses[key]["warning_min"]] + if val > alarm_max or val < alarm_min: + resp[f"{key}_status"] = "alarm" + elif val > warning_max or val < warning_min: + resp[f"{key}_status"] = "warning" + else: + resp[f"{key}_status"] = "normal" + for amount, approx in approximation.items(): + vals = values[key][60 - amount :] + r = sum(vals) / len(vals) + approximation_values[amount][key] = r + ExgausterRecordApproximationSignal.objects.create( + record=approx, signal=signal, value=r + ) except KeyError: continue - for amount, approx in approximation.items(): - vals = values[key][60-amount:] - r = sum(vals) / len(vals) - approximation_values[amount][key] = r - ExgausterRecordApproximationSignal.objects.create( - record=approx, signal=signal, value=r - ) + loop = asyncio.get_event_loop() + coroutine = send_to_channel_layer(resp) + loop.run_until_complete(coroutine) + for approx, data in approximation_values.items(): loop = asyncio.get_event_loop() coroutine = send_to_channel_layer_approximation(data, approx) diff --git a/exhauster_analytics/analytics/consumers.py b/exhauster_analytics/analytics/consumers.py index 0bebc9f..154ee1f 100644 --- a/exhauster_analytics/analytics/consumers.py +++ b/exhauster_analytics/analytics/consumers.py @@ -1,7 +1,10 @@ import json +from asgiref.sync import sync_to_async from channels.generic.websocket import AsyncWebsocketConsumer +from exhauster_analytics.analytics.models import Record + class NotificationsConsumer(AsyncWebsocketConsumer): def __init__(self, *args, **kwargs): @@ -10,8 +13,11 @@ class NotificationsConsumer(AsyncWebsocketConsumer): async def connect(self): self.room_group_name = "notifications" - await self.accept() + + data = await self.get_last_record() + await self.send(text_data=json.dumps(data)) + await self.channel_layer.group_add(self.room_group_name, self.channel_name) async def disconnect(self, close_code): @@ -21,6 +27,18 @@ class NotificationsConsumer(AsyncWebsocketConsumer): async def receive(self, text_data): pass + @sync_to_async + def get_last_record(self): + data = {} + record = Record.objects.last() + for signal in record.signals.all(): + if not signal.signal.config: + data[signal.signal.name] = signal.value + if signal.signal.installations: + data[f"{signal.signal.name}_status"] = "normal" + + return data + async def info(self, event): message = event["data"] diff --git a/exhauster_analytics/analytics/migrations/0010_exgaustersignal_config.py b/exhauster_analytics/analytics/migrations/0010_exgaustersignal_config.py new file mode 100644 index 0000000..005d9ca --- /dev/null +++ b/exhauster_analytics/analytics/migrations/0010_exgaustersignal_config.py @@ -0,0 +1,23 @@ +# Generated by Django 4.1.7 on 2023-02-18 21:10 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("analytics", "0009_alter_record_options_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="exgaustersignal", + name="config", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="installations", + to="analytics.exgaustersignal", + ), + ), + ] diff --git a/exhauster_analytics/analytics/models.py b/exhauster_analytics/analytics/models.py index d5fee6f..8149fe2 100644 --- a/exhauster_analytics/analytics/models.py +++ b/exhauster_analytics/analytics/models.py @@ -40,7 +40,9 @@ class ExgausterRecordSignal(models.Model): class ExgausterRecordApproximationSignal(models.Model): record = models.ForeignKey( - "analytics.RecordApproximation", related_name="signals", on_delete=models.CASCADE + "analytics.RecordApproximation", + related_name="signals", + on_delete=models.CASCADE, ) signal = models.ForeignKey( "analytics.ExgausterSignal", @@ -81,6 +83,10 @@ class ExgausterSignal(models.Model): characteristics_description = models.CharField(max_length=200, blank=True) item_name = models.CharField(max_length=200, blank=True) + config = models.ForeignKey( + "self", null=True, related_name="installations", on_delete=models.SET_NULL + ) + @property def name(self) -> str: return f"SM_Exgauster\\[{self.place_x}{':' if self.type == 'analog' else '.'}{self.place_y}]"