backend/extract.py

108 lines
2.8 KiB
Python
Raw Normal View History

2023-02-18 23:51:02 +03:00
import os
import datetime
import json
import django
import csv
from confluent_kafka.avro import Consumer
from confluent_kafka.avro.serializer import SerializerError
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")
django.setup()
from exhauster_analytics.analytics.models import (
Exgauster,
)
from config.settings.base import structlog
log = structlog.get_logger()
conf = {
"bootstrap.servers": "rc1a-b5e65f36lm3an1d5.mdb.yandexcloud.net:9091",
"group.id": "MentalMind",
"session.timeout.ms": 6000,
"security.protocol": "SASL_SSL",
"ssl.ca.location": "CA.pem",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "9433_reader",
"sasl.password": "eUIpgWu0PWTJaTrjhjQD3.hoyhntiK",
"auto.offset.reset": "end",
}
def my_assign(consumer, partitions):
for p in partitions:
p.offset = 0
print("assign", partitions)
consumer.assign(partitions)
for n in range(1, 7):
c = Consumer(conf)
c.subscribe(["zsmk-9433-dev-01"], on_assign=my_assign)
signals = []
for ex in Exgauster.objects.get(number=n).signals.all():
signals.append(
f"SM_Exgauster\\[{ex.place_x}{':' if ex.type == 'analog' else '.'}{ex.place_y}]"
)
res = []
while True:
try:
msg = c.poll(10)
except SerializerError as e:
log.info("Message deserialization failed for {}: {}".format(msg, e))
continue
if msg is None:
continue
if msg.error():
log.info("AvroConsumer error: {}".format(msg.error()))
continue
try:
data = json.loads(msg.value())
except json.JSONDecodeError:
log.info("Message deserialization failed for {}".format(msg))
continue
rows = []
date = datetime.datetime.strptime(
data["moment"], "%Y-%m-%dT%H:%M:%S.%f"
) + datetime.timedelta(hours=3)
for key in signals:
try:
rows.append(data[key])
except KeyError:
try:
rows.append(data[key.replace(":", ".")])
except KeyError:
try:
rows.append(data[key.replace(".", ":")])
except KeyError:
rows.append(None)
res.append([date] + rows)
if msg.offset() % 100 == 0:
print(msg.offset())
if msg.offset() > 31500:
break
signals = []
for ex in Exgauster.objects.get(number=1).signals.all():
signals.append(
f"SM_Exgauster\[{ex.place_x}{':' if ex.type == 'analog' else '.'}{ex.place_y}]"
)
with open(f"{n}.csv", "w") as file:
writer = csv.writer(file)
writer.writerow(["timestamp"] + signals)
for row in res:
writer.writerow(row)
c.close()