import os
import datetime
import json
import django
import csv

from confluent_kafka.avro import Consumer
from confluent_kafka.avro.serializer import SerializerError

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")
django.setup()

from exhauster_analytics.analytics.models import (
    Exgauster,
)
from config.settings.base import structlog

log = structlog.get_logger()

conf = {
    "bootstrap.servers": "rc1a-b5e65f36lm3an1d5.mdb.yandexcloud.net:9091",
    "group.id": "MentalMind",
    "session.timeout.ms": 6000,
    "security.protocol": "SASL_SSL",
    "ssl.ca.location": "CA.pem",
    "sasl.mechanism": "SCRAM-SHA-512",
    "sasl.username": "9433_reader",
    "sasl.password": "eUIpgWu0PWTJaTrjhjQD3.hoyhntiK",
    "auto.offset.reset": "end",
}


def my_assign(consumer, partitions):
    for p in partitions:
        p.offset = 0
    print("assign", partitions)
    consumer.assign(partitions)



for n in range(1, 7):
    c = Consumer(conf)
    c.subscribe(["zsmk-9433-dev-01"], on_assign=my_assign)
    signals = []
    for ex in Exgauster.objects.get(number=n).signals.all():
        signals.append(
            f"SM_Exgauster\\[{ex.place_x}{':' if ex.type == 'analog' else '.'}{ex.place_y}]"
        )

    res = []

    while True:
        try:
            msg = c.poll(10)
        except SerializerError as e:
            log.info("Message deserialization failed for {}: {}".format(msg, e))
            continue

        if msg is None:
            continue

        if msg.error():
            log.info("AvroConsumer error: {}".format(msg.error()))
            continue
        try:
            data = json.loads(msg.value())
        except json.JSONDecodeError:
            log.info("Message deserialization failed for {}".format(msg))
            continue

        rows = []
        date = datetime.datetime.strptime(
            data["moment"], "%Y-%m-%dT%H:%M:%S.%f"
        ) + datetime.timedelta(hours=3)
        for key in signals:
            try:
                rows.append(data[key])
            except KeyError:
                try:
                    rows.append(data[key.replace(":", ".")])
                except KeyError:
                    try:
                        rows.append(data[key.replace(".", ":")])
                    except KeyError:
                        rows.append(None)

        res.append([date] + rows)

        if msg.offset() % 100 == 0:
            print(msg.offset())

        if msg.offset() > 31500:
            break

    signals = []
    for ex in Exgauster.objects.get(number=1).signals.all():
        signals.append(
            f"SM_Exgauster\[{ex.place_x}{':' if ex.type == 'analog' else '.'}{ex.place_y}]"
        )

    with open(f"{n}.csv", "w") as file:
        writer = csv.writer(file)
        writer.writerow(["timestamp"] + signals)
        for row in res:
            writer.writerow(row)

    c.close()