backend/consumer.py
2023-02-19 09:42:46 +03:00

337 lines
19 KiB
Python

import asyncio
import os
import datetime
import json
import django
import time
from channels.layers import get_channel_layer
from confluent_kafka.avro import Consumer
from confluent_kafka.avro.serializer import SerializerError
from django.utils.timezone import make_aware
print("loading values from DB, please wait")
# from inference import run
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")
django.setup()
from exhauster_analytics.analytics.models import (
ExgausterSignal,
Record,
ExgausterRecordSignal,
RecordApproximation,
ExgausterRecordApproximationSignal,
)
from config.settings.base import structlog
log = structlog.get_logger()
values = {}
for sign in ExgausterSignal.objects.all():
if not sign.config:
values[sign.name] = []
em = len(Record.objects.all())
if em > 60:
values["moment"] = [x.timestamp for x in Record.objects.all()[em - 60 :]]
for record in Record.objects.all()[em - 60 :]:
for sign in record.signals.all():
if not sign.signal.config and sign.signal.name in values:
values[sign.signal.name].append(sign.value)
statuses = {}
keys_not_to_send = []
for signal in ExgausterSignal.objects.filter(config__isnull=True):
if signal.characteristics_description:
statuses[signal.name] = {}
for installation in signal.installations.all():
keys_not_to_send.append(installation.name)
statuses[signal.name][installation.item_name] = installation.name
conf = {
"bootstrap.servers": "rc1a-2ar1hqnl386tvq7k.mdb.yandexcloud.net:9091",
"group.id": "MentalMind",
"session.timeout.ms": 6000,
"security.protocol": "SASL_SSL",
"ssl.ca.location": "CA.pem",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "9433_reader",
"sasl.password": "eUIpgWu0PWTJaTrjhjQD3.hoyhntiK",
"auto.offset.reset": "end",
}
mapping = {
"SM_Exgauster[2:27]": "Подшипник 1 Температура нагрева Температура temperature",
"SM_Exgauster[2:65]": "Подшипник 1 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:74]": "Подшипник 1 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:83]": "Подшипник 1 Температура нагрева Температура warning_max",
"SM_Exgauster[2:92]": "Подшипник 1 Температура нагрева Температура warning_min",
"SM_Exgauster[2:2]": "Подшипник 1 Вибрация Осевая vibration_axial",
"SM_Exgauster[2:139]": "Подшипник 1 Вибрация Осевая alarm_max",
"SM_Exgauster[2:151]": "Подшипник 1 Вибрация Осевая alarm_min",
"SM_Exgauster[2:163]": "Подшипник 1 Вибрация Осевая warning_max",
"SM_Exgauster[2:175]": "Подшипник 1 Вибрация Осевая warning_min",
"SM_Exgauster[2:0]": "Подшипник 1 Вибрация Горизонтальная vibration_horizontal",
"SM_Exgauster[2:137]": "Подшипник 1 Вибрация Горизонтальная alarm_max",
"SM_Exgauster[2:149]": "Подшипник 1 Вибрация Горизонтальная alarm_min",
"SM_Exgauster[2:161]": "Подшипник 1 Вибрация Горизонтальная warning_max",
"SM_Exgauster[2:173]": "Подшипник 1 Вибрация Горизонтальная warning_min",
"SM_Exgauster[2:1]": "Подшипник 1 Вибрация Вертикальная vibration_vertical",
"SM_Exgauster[2:138]": "Подшипник 1 Вибрация Вертикальная alarm_max",
"SM_Exgauster[2:150]": "Подшипник 1 Вибрация Вертикальная alarm_min",
"SM_Exgauster[2:162]": "Подшипник 1 Вибрация Вертикальная warning_max",
"SM_Exgauster[2:174]": "Подшипник 1 Вибрация Вертикальная warning_min",
"SM_Exgauster[2:28]": "Подшипник 2 Температура нагрева Температура temperature",
"SM_Exgauster[2:66]": "Подшипник 2 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:75]": "Подшипник 2 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:84]": "Подшипник 2 Температура нагрева Температура warning_max",
"SM_Exgauster[2:93]": "Подшипник 2 Температура нагрева Температура warning_min",
"SM_Exgauster[2:5]": "Подшипник 2 Вибрация Осевая vibration_axial",
"SM_Exgauster[2:142]": "Подшипник 2 Вибрация Осевая alarm_max",
"SM_Exgauster[2:154]": "Подшипник 2 Вибрация Осевая alarm_min",
"SM_Exgauster[2:166]": "Подшипник 2 Вибрация Осевая warning_max",
"SM_Exgauster[2:178]": "Подшипник 2 Вибрация Осевая warning_min",
"SM_Exgauster[2:3]": "Подшипник 2 Вибрация Горизонтальная vibration_horizontal",
"SM_Exgauster[2:140]": "Подшипник 2 Вибрация Горизонтальная alarm_max",
"SM_Exgauster[2:152]": "Подшипник 2 Вибрация Горизонтальная alarm_min",
"SM_Exgauster[2:164]": "Подшипник 2 Вибрация Горизонтальная warning_max",
"SM_Exgauster[2:176]": "Подшипник 2 Вибрация Горизонтальная warning_min",
"SM_Exgauster[2:4]": "Подшипник 2 Вибрация Вертикальная vibration_vertical",
"SM_Exgauster[2:141]": "Подшипник 2 Вибрация Вертикальная alarm_max",
"SM_Exgauster[2:153]": "Подшипник 2 Вибрация Вертикальная alarm_min",
"SM_Exgauster[2:165]": "Подшипник 2 Вибрация Вертикальная warning_max",
"SM_Exgauster[2:177]": "Подшипник 2 Вибрация Вертикальная warning_min",
"SM_Exgauster[2:29]": "Подшипник 3 Температура нагрева Температура temperature",
"SM_Exgauster[2:67]": "Подшипник 3 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:76]": "Подшипник 3 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:85]": "Подшипник 3 Температура нагрева Температура warning_max",
"SM_Exgauster[2:94]": "Подшипник 3 Температура нагрева Температура warning_min",
"SM_Exgauster[2:30]": "Подшипник 4 Температура нагрева Температура temperature",
"SM_Exgauster[2:68]": "Подшипник 4 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:77]": "Подшипник 4 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:86]": "Подшипник 4 Температура нагрева Температура warning_max",
"SM_Exgauster[2:95]": "Подшипник 4 Температура нагрева Температура warning_min",
"SM_Exgauster[2:31]": "Подшипник 5 Температура нагрева Температура temperature",
"SM_Exgauster[2:69]": "Подшипник 5 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:78]": "Подшипник 5 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:87]": "Подшипник 5 Температура нагрева Температура warning_max",
"SM_Exgauster[2:96]": "Подшипник 5 Температура нагрева Температура warning_min",
"SM_Exgauster[2:32]": "Подшипник 6 Температура нагрева Температура temperature",
"SM_Exgauster[2:70]": "Подшипник 6 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:79]": "Подшипник 6 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:88]": "Подшипник 6 Температура нагрева Температура warning_max",
"SM_Exgauster[2:97]": "Подшипник 6 Температура нагрева Температура warning_min",
"SM_Exgauster[2:33]": "Подшипник 7 Температура нагрева Температура temperature",
"SM_Exgauster[2:71]": "Подшипник 7 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:80]": "Подшипник 7 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:89]": "Подшипник 7 Температура нагрева Температура warning_max",
"SM_Exgauster[2:98]": "Подшипник 7 Температура нагрева Температура warning_min",
"SM_Exgauster[2:8]": "Подшипник 7 Вибрация Осевая vibration_axial",
"SM_Exgauster[2:145]": "Подшипник 7 Вибрация Осевая alarm_max",
"SM_Exgauster[2:157]": "Подшипник 7 Вибрация Осевая alarm_min",
"SM_Exgauster[2:169]": "Подшипник 7 Вибрация Осевая warning_max",
"SM_Exgauster[2:181]": "Подшипник 7 Вибрация Осевая warning_min",
"SM_Exgauster[2:6]": "Подшипник 7 Вибрация Горизонтальная vibration_horizontal",
"SM_Exgauster[2:143]": "Подшипник 7 Вибрация Горизонтальная alarm_max",
"SM_Exgauster[2:155]": "Подшипник 7 Вибрация Горизонтальная alarm_min",
"SM_Exgauster[2:167]": "Подшипник 7 Вибрация Горизонтальная warning_max",
"SM_Exgauster[2:179]": "Подшипник 7 Вибрация Горизонтальная warning_min",
"SM_Exgauster[2:7]": "Подшипник 7 Вибрация Вертикальная vibration_vertical",
"SM_Exgauster[2:144]": "Подшипник 7 Вибрация Вертикальная alarm_max",
"SM_Exgauster[2:156]": "Подшипник 7 Вибрация Вертикальная alarm_min",
"SM_Exgauster[2:168]": "Подшипник 7 Вибрация Вертикальная warning_max",
"SM_Exgauster[2:180]": "Подшипник 7 Вибрация Вертикальная warning_min",
"SM_Exgauster[2:34]": "Подшипник 8 Температура нагрева Температура temperature",
"SM_Exgauster[2:72]": "Подшипник 8 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:81]": "Подшипник 8 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:90]": "Подшипник 8 Температура нагрева Температура warning_max",
"SM_Exgauster[2:99]": "Подшипник 8 Температура нагрева Температура warning_min",
"SM_Exgauster[2:11]": "Подшипник 8 Вибрация Осевая vibration_axial",
"SM_Exgauster[2:148]": "Подшипник 8 Вибрация Осевая alarm_max",
"SM_Exgauster[2:160]": "Подшипник 8 Вибрация Осевая alarm_min",
"SM_Exgauster[2:172]": "Подшипник 8 Вибрация Осевая warning_max",
"SM_Exgauster[2:184]": "Подшипник 8 Вибрация Осевая warning_min",
"SM_Exgauster[2:9]": "Подшипник 8 Вибрация Горизонтальная vibration_horizontal",
"SM_Exgauster[2:146]": "Подшипник 8 Вибрация Горизонтальная alarm_max",
"SM_Exgauster[2:158]": "Подшипник 8 Вибрация Горизонтальная alarm_min",
"SM_Exgauster[2:170]": "Подшипник 8 Вибрация Горизонтальная warning_max",
"SM_Exgauster[2:182]": "Подшипник 8 Вибрация Горизонтальная warning_min",
"SM_Exgauster[2:10]": "Подшипник 8 Вибрация Вертикальная vibration_vertical",
"SM_Exgauster[2:147]": "Подшипник 8 Вибрация Вертикальная alarm_max",
"SM_Exgauster[2:159]": "Подшипник 8 Вибрация Вертикальная alarm_min",
"SM_Exgauster[2:171]": "Подшипник 8 Вибрация Вертикальная warning_max",
"SM_Exgauster[2:183]": "Подшипник 8 Вибрация Вертикальная warning_min",
"SM_Exgauster[2:35]": "Подшипник 9 Температура нагрева Температура temperature",
"SM_Exgauster[2:73]": "Подшипник 9 Температура нагрева Температура alarm_max",
"SM_Exgauster[2:82]": "Подшипник 9 Температура нагрева Температура alarm_min",
"SM_Exgauster[2:91]": "Подшипник 9 Температура нагрева Температура warning_max",
"SM_Exgauster[2:100]": "Подшипник 9 Температура нагрева Температура warning_min",
"SM_Exgauster[2:42]": "Охладитель Масло temperature_after",
"SM_Exgauster[2:41]": "Охладитель Масло temperature_before",
"SM_Exgauster[2:37]": "Охладитель Вода temperature_after",
"SM_Exgauster[2:36]": "Охладитель Вода temperature_before",
"SM_Exgauster[2:24]": "Газовый коллектор temperature_before",
"SM_Exgauster[2:61]": "Газовый коллектор underpressure_before",
"SM_Exgauster[4.1]": "Положение задвижки gas_valve_closed",
"SM_Exgauster[4.2]": "Положение задвижки gas_valve_open",
"SM_Exgauster[4:6]": "Положение задвижки gas_valve_position",
"SM_Exgauster[4:2]": "Главный привод rotor_current",
"SM_Exgauster[4:4]": "Главный привод rotor_voltage",
"SM_Exgauster[4:3]": "Главный привод stator_current",
"SM_Exgauster[4:5]": "Главный привод stator_voltage",
"SM_Exgauster[4:0]": "Маслосистема oil_level",
"SM_Exgauster[4:1]": "Маслосистема oil_pressure",
"SM_Exgauster[2.0]": "Работа эксгаустера work",
}
def my_assign(consumer, partitions):
for p in partitions:
p.offset = Record.objects.last().offset + 1
consumer.assign(partitions)
async def send_to_channel_layer(data):
channel_layer = get_channel_layer()
await channel_layer.group_send("notifications", {"data": data, "type": "info"})
async def send_to_channel_layer_approximation(data, approximation):
channel_layer = get_channel_layer()
await channel_layer.group_send(
f"approximation_{approximation}", {"data": data, "type": "info"}
)
c = Consumer(conf)
c.subscribe(["zsmk-9433-dev-01"], on_assign=my_assign)
signals = {}
for ex in ExgausterSignal.objects.all():
if not ex.config:
if ex.place_x not in signals:
signals[ex.place_x] = {}
if ex.place_y not in signals[ex.place_x]:
signals[ex.place_x][ex.place_y] = {}
signals[ex.place_x][ex.place_y][ex.type] = ex
# delete latest offset
offset = Record.objects.last().offset
Record.objects.filter(offset=offset).delete()
RecordApproximation.objects.filter(offset=offset).delete()
while True:
try:
msg = c.poll(10)
except SerializerError as e:
log.info("Message deserialization failed for {}: {}".format(msg, e))
continue
if msg is None:
continue
if msg.error():
log.info("AvroConsumer error: {}".format(msg.error()))
continue
try:
data = json.loads(msg.value())
except json.JSONDecodeError:
log.info("Message deserialization failed for {}".format(msg))
continue
date = make_aware(
datetime.datetime.strptime(data["moment"], "%Y-%m-%dT%H:%M:%S.%f")
+ datetime.timedelta(hours=3)
)
offset = msg.offset()
start = time.time()
print(f"DEBUG: received {offset}, {date}")
resp = {}
record = Record.objects.create(timestamp=date, offset=offset, message=data)
if offset != 0:
approximation_amounts = [
x for x, _ in RecordApproximation.AmountChoices.choices if offset % x == 0
]
else:
approximation_amounts = []
approximation = {}
for amount in approximation_amounts:
approximation[amount] = RecordApproximation.objects.create(
amount=amount, timestamp=date, offset=offset
)
approximation_values = {}
for approximation_val in approximation_amounts:
approximation_values[approximation_val] = {}
for key, val in data.items():
if "SM" in key and key not in keys_not_to_send:
resp[key] = val
if key in values:
del values[key][0]
else:
values[key] = [0] * 60
values[key].append(val)
if "." in key:
x, y = map(int, key[key.find("[") + 1 : key.find("]")].split("."))
type = "digital"
else:
x, y = map(int, key[key.find("[") + 1 : key.find("]")].split(":"))
type = "analog"
try:
signal = signals[x][y][type]
ExgausterRecordSignal.objects.create(
record=record, signal=signal, value=val
)
if key in statuses:
alarm_max = data[statuses[key]["alarm_max"]]
alarm_min = data[statuses[key]["alarm_min"]]
warning_max = data[statuses[key]["warning_max"]]
warning_min = data[statuses[key]["warning_min"]]
if val > alarm_max or val < alarm_min:
resp[f"{key}_status"] = "alarm"
elif val > warning_max or val < warning_min:
resp[f"{key}_status"] = "warning"
else:
resp[f"{key}_status"] = "normal"
for amount, approx in approximation.items():
vals = values[key][60 - amount :]
r = sum(vals) / len(vals)
approximation_values[amount][key] = r
ExgausterRecordApproximationSignal.objects.create(
record=approx, signal=signal, value=r
)
except KeyError:
continue
del values["moment"][0]
values["moment"].append(date)
# call ml
# df = pandas.DataFrame.from_dict(values, orient="index").transpose()
# df.columns = ["moment"] + [
# "".join([x for x in el if x != "\\"]) for el in df.columns if el != "moment"
# ]
# df = df.rename(columns=mapping)
# df["moment"] = pandas.to_datetime(df["moment"], utc=True)
# df = df.set_index("moment").resample("5T").first().reset_index()
# run(df)
loop = asyncio.get_event_loop()
coroutine = send_to_channel_layer(resp)
loop.run_until_complete(coroutine)
for approx, data in approximation_values.items():
loop = asyncio.get_event_loop()
coroutine = send_to_channel_layer_approximation(data, approx)
loop.run_until_complete(coroutine)
print(f"DEBUG: done {offset}, {time.time() - start} sec taken")
c.close()