added signal status for consumer, added initial data in sockets

This commit is contained in:
Alexander Karpov 2023-02-19 02:51:45 +03:00
parent 3f7fe25319
commit 2aef5ef336
4 changed files with 96 additions and 22 deletions

View File

@ -28,17 +28,28 @@ log = structlog.get_logger()
print("loading values from DB, please wait") print("loading values from DB, please wait")
values = {} values = {}
for sign in Record.objects.last().signals.all(): for sign in ExgausterSignal.objects.all():
values[sign.signal.name] = [] if not sign.config:
values[sign.name] = []
em = len(Record.objects.all()) em = len(Record.objects.all())
if em > 60: if em > 60:
for record in Record.objects.all()[em - 60 :]: for record in Record.objects.all()[em - 60 :]:
for sign in record.signals.all(): for sign in record.signals.all():
if sign.signal.name in values: if not sign.signal.config and sign.signal.name in values:
values[sign.signal.name].append(sign.value) values[sign.signal.name].append(sign.value)
statuses = {}
keys_not_to_send = []
for signal in ExgausterSignal.objects.filter(config__isnull=True):
if signal.characteristics_description:
statuses[signal.name] = {}
for installation in signal.installations.all():
keys_not_to_send.append(installation.name)
statuses[signal.name][installation.item_name] = installation.name
conf = { conf = {
"bootstrap.servers": "rc1a-2ar1hqnl386tvq7k.mdb.yandexcloud.net:9091", "bootstrap.servers": "rc1a-2ar1hqnl386tvq7k.mdb.yandexcloud.net:9091",
"group.id": "MentalMind", "group.id": "MentalMind",
@ -55,7 +66,6 @@ conf = {
def my_assign(consumer, partitions): def my_assign(consumer, partitions):
for p in partitions: for p in partitions:
p.offset = Record.objects.last().offset + 1 p.offset = Record.objects.last().offset + 1
print("assign", partitions)
consumer.assign(partitions) consumer.assign(partitions)
@ -77,12 +87,14 @@ c.subscribe(["zsmk-9433-dev-01"], on_assign=my_assign)
signals = {} signals = {}
for ex in ExgausterSignal.objects.all(): for ex in ExgausterSignal.objects.all():
if not ex.config:
if ex.place_x not in signals: if ex.place_x not in signals:
signals[ex.place_x] = {} signals[ex.place_x] = {}
if ex.place_y not in signals[ex.place_x]: if ex.place_y not in signals[ex.place_x]:
signals[ex.place_x][ex.place_y] = {} signals[ex.place_x][ex.place_y] = {}
signals[ex.place_x][ex.place_y][ex.type] = ex signals[ex.place_x][ex.place_y][ex.type] = ex
# delete latest offset # delete latest offset
offset = Record.objects.last().offset offset = Record.objects.last().offset
Record.objects.filter(offset=offset).delete() Record.objects.filter(offset=offset).delete()
@ -113,9 +125,7 @@ while True:
offset = msg.offset() offset = msg.offset()
start = time.time() start = time.time()
print(f"DEBUG: received {offset}, {date}") print(f"DEBUG: received {offset}, {date}")
loop = asyncio.get_event_loop() resp = {}
coroutine = send_to_channel_layer(data)
loop.run_until_complete(coroutine)
record = Record.objects.create(timestamp=date, offset=offset, message=data) record = Record.objects.create(timestamp=date, offset=offset, message=data)
if offset != 0: if offset != 0:
approximation_amounts = [ approximation_amounts = [
@ -133,7 +143,9 @@ while True:
approximation_values[approximation_val] = {} approximation_values[approximation_val] = {}
for key, val in data.items(): for key, val in data.items():
if "SM" in key: if "SM" in key and key not in keys_not_to_send:
resp[key] = val
if key in values: if key in values:
del values[key][0] del values[key][0]
else: else:
@ -151,15 +163,30 @@ while True:
ExgausterRecordSignal.objects.create( ExgausterRecordSignal.objects.create(
record=record, signal=signal, value=val record=record, signal=signal, value=val
) )
except KeyError: if key in statuses:
continue alarm_max = data[statuses[key]["alarm_max"]]
alarm_min = data[statuses[key]["alarm_min"]]
warning_max = data[statuses[key]["warning_max"]]
warning_min = data[statuses[key]["warning_min"]]
if val > alarm_max or val < alarm_min:
resp[f"{key}_status"] = "alarm"
elif val > warning_max or val < warning_min:
resp[f"{key}_status"] = "warning"
else:
resp[f"{key}_status"] = "normal"
for amount, approx in approximation.items(): for amount, approx in approximation.items():
vals = values[key][60-amount:] vals = values[key][60 - amount :]
r = sum(vals) / len(vals) r = sum(vals) / len(vals)
approximation_values[amount][key] = r approximation_values[amount][key] = r
ExgausterRecordApproximationSignal.objects.create( ExgausterRecordApproximationSignal.objects.create(
record=approx, signal=signal, value=r record=approx, signal=signal, value=r
) )
except KeyError:
continue
loop = asyncio.get_event_loop()
coroutine = send_to_channel_layer(resp)
loop.run_until_complete(coroutine)
for approx, data in approximation_values.items(): for approx, data in approximation_values.items():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
coroutine = send_to_channel_layer_approximation(data, approx) coroutine = send_to_channel_layer_approximation(data, approx)

View File

@ -1,7 +1,10 @@
import json import json
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer from channels.generic.websocket import AsyncWebsocketConsumer
from exhauster_analytics.analytics.models import Record
class NotificationsConsumer(AsyncWebsocketConsumer): class NotificationsConsumer(AsyncWebsocketConsumer):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -10,8 +13,11 @@ class NotificationsConsumer(AsyncWebsocketConsumer):
async def connect(self): async def connect(self):
self.room_group_name = "notifications" self.room_group_name = "notifications"
await self.accept() await self.accept()
data = await self.get_last_record()
await self.send(text_data=json.dumps(data))
await self.channel_layer.group_add(self.room_group_name, self.channel_name) await self.channel_layer.group_add(self.room_group_name, self.channel_name)
async def disconnect(self, close_code): async def disconnect(self, close_code):
@ -21,6 +27,18 @@ class NotificationsConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data): async def receive(self, text_data):
pass pass
@sync_to_async
def get_last_record(self):
data = {}
record = Record.objects.last()
for signal in record.signals.all():
if not signal.signal.config:
data[signal.signal.name] = signal.value
if signal.signal.installations:
data[f"{signal.signal.name}_status"] = "normal"
return data
async def info(self, event): async def info(self, event):
message = event["data"] message = event["data"]

View File

@ -0,0 +1,23 @@
# Generated by Django 4.1.7 on 2023-02-18 21:10
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("analytics", "0009_alter_record_options_and_more"),
]
operations = [
migrations.AddField(
model_name="exgaustersignal",
name="config",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="installations",
to="analytics.exgaustersignal",
),
),
]

View File

@ -40,7 +40,9 @@ class ExgausterRecordSignal(models.Model):
class ExgausterRecordApproximationSignal(models.Model): class ExgausterRecordApproximationSignal(models.Model):
record = models.ForeignKey( record = models.ForeignKey(
"analytics.RecordApproximation", related_name="signals", on_delete=models.CASCADE "analytics.RecordApproximation",
related_name="signals",
on_delete=models.CASCADE,
) )
signal = models.ForeignKey( signal = models.ForeignKey(
"analytics.ExgausterSignal", "analytics.ExgausterSignal",
@ -81,6 +83,10 @@ class ExgausterSignal(models.Model):
characteristics_description = models.CharField(max_length=200, blank=True) characteristics_description = models.CharField(max_length=200, blank=True)
item_name = models.CharField(max_length=200, blank=True) item_name = models.CharField(max_length=200, blank=True)
config = models.ForeignKey(
"self", null=True, related_name="installations", on_delete=models.SET_NULL
)
@property @property
def name(self) -> str: def name(self) -> str:
return f"SM_Exgauster\\[{self.place_x}{':' if self.type == 'analog' else '.'}{self.place_y}]" return f"SM_Exgauster\\[{self.place_x}{':' if self.type == 'analog' else '.'}{self.place_y}]"