import re
import time
from collections import Counter
from pathlib import Path
from typing import Callable, Iterable, List, Optional, Union

import srsly
from thinc.api import (
    Config,
    Model,
    Optimizer,
    fix_random_seed,
    set_dropout_rate,
    set_gpu_allocator,
)
from thinc.config import ConfigValidationError
from wasabi import Printer

from ..errors import Errors
from ..schemas import ConfigSchemaPretrain
from ..tokens import Doc
from ..util import dot_to_object, load_model_from_config, registry
from .example import Example


def pretrain(
    config: Config,
    output_dir: Path,
    resume_path: Optional[Path] = None,
    epoch_resume: Optional[int] = None,
    use_gpu: int = -1,
    silent: bool = True,
    skip_last: bool = False,
):
    msg = Printer(no_print=silent)
    if config["training"]["seed"] is not None:
        fix_random_seed(config["training"]["seed"])
    allocator = config["training"]["gpu_allocator"]
    if use_gpu >= 0 and allocator:
        set_gpu_allocator(allocator)
    # ignore in pretraining because we're creating it now
    config["initialize"]["init_tok2vec"] = None
    nlp = load_model_from_config(config)
    _config = nlp.config.interpolate()
    P = registry.resolve(_config["pretraining"], schema=ConfigSchemaPretrain)
    corpus = dot_to_object(_config, P["corpus"])
    corpus = registry.resolve({"corpus": corpus})["corpus"]
    batcher = P["batcher"]
    model = create_pretraining_model(nlp, P)
    optimizer = P["optimizer"]
    # Load in pretrained weights to resume from
    if resume_path is not None:
        epoch_resume = _resume_model(model, resume_path, epoch_resume, silent=silent)
    else:
        # Without '--resume-path' the '--epoch-resume' argument is ignored
        epoch_resume = 0

    objective = model.attrs["loss"]
    # TODO: move this to logger function?
    tracker = ProgressTracker(frequency=10000)
    if P["n_save_epoch"]:
        msg.divider(
            f"Pre-training tok2vec layer - starting at epoch {epoch_resume} - saving every {P['n_save_epoch']} epoch"
        )
    else:
        msg.divider(f"Pre-training tok2vec layer - starting at epoch {epoch_resume}")
    row_settings = {"widths": (3, 10, 10, 6, 4), "aligns": ("r", "r", "r", "r", "r")}
    msg.row(("#", "# Words", "Total Loss", "Loss", "w/s"), **row_settings)

    def _save_model(epoch, is_temp=False, is_last=False):
        is_temp_str = ".temp" if is_temp else ""
        with model.use_params(optimizer.averages):
            if is_last:
                save_path = output_dir / f"model-last.bin"
            else:
                save_path = output_dir / f"model{epoch}{is_temp_str}.bin"
            with (save_path).open("wb") as file_:
                file_.write(model.get_ref("tok2vec").to_bytes())
            log = {
                "nr_word": tracker.nr_word,
                "loss": tracker.loss,
                "epoch_loss": tracker.epoch_loss,
                "epoch": epoch,
            }
            with (output_dir / "log.jsonl").open("a") as file_:
                file_.write(srsly.json_dumps(log) + "\n")

    # TODO: I think we probably want this to look more like the
    # 'create_train_batches' function?
    try:
        for epoch in range(epoch_resume, P["max_epochs"]):
            for batch_id, batch in enumerate(batcher(corpus(nlp))):
                docs = ensure_docs(batch)
                loss = make_update(model, docs, optimizer, objective)
                progress = tracker.update(epoch, loss, docs)
                if progress:
                    msg.row(progress, **row_settings)
                if P["n_save_every"] and (batch_id % P["n_save_every"] == 0):
                    _save_model(epoch, is_temp=True)

            if P["n_save_epoch"]:
                if epoch % P["n_save_epoch"] == 0 or epoch == P["max_epochs"] - 1:
                    _save_model(epoch)
            else:
                _save_model(epoch)
            tracker.epoch_loss = 0.0
    finally:
        if not skip_last:
            _save_model(P["max_epochs"], is_last=True)


def ensure_docs(examples_or_docs: Iterable[Union[Doc, Example]]) -> List[Doc]:
    docs = []
    for eg_or_doc in examples_or_docs:
        if isinstance(eg_or_doc, Doc):
            docs.append(eg_or_doc)
        else:
            docs.append(eg_or_doc.reference)
    return docs


def _resume_model(
    model: Model, resume_path: Path, epoch_resume: Optional[int], silent: bool = True
) -> int:
    msg = Printer(no_print=silent)
    msg.info(f"Resume training tok2vec from: {resume_path}")
    with resume_path.open("rb") as file_:
        weights_data = file_.read()
        model.get_ref("tok2vec").from_bytes(weights_data)

    if epoch_resume is None:
        # Parse the epoch number from the given weight file
        model_name = re.search(r"model\d+\.bin", str(resume_path))
        if model_name:
            # Default weight file name so read epoch_start from it by cutting off 'model' and '.bin'
            epoch_resume = int(model_name.group(0)[5:][:-4]) + 1
        else:
            # No epoch given and couldn't infer it
            raise ValueError(Errors.E1020)

    msg.info(f"Resuming from epoch: {epoch_resume}")
    return epoch_resume


def make_update(
    model: Model, docs: Iterable[Doc], optimizer: Optimizer, objective_func: Callable
) -> float:
    """Perform an update over a single batch of documents.

    docs (iterable): A batch of `Doc` objects.
    optimizer (callable): An optimizer.
    RETURNS loss: A float for the loss.
    """
    predictions, backprop = model.begin_update(docs)
    loss, gradients = objective_func(model.ops, docs, predictions)
    backprop(gradients)
    model.finish_update(optimizer)
    # Don't want to return a cupy object here
    # The gradients are modified in-place by the BERT MLM,
    # so we get an accurate loss
    return float(loss)


def create_pretraining_model(nlp, pretrain_config):
    """Define a network for the pretraining. We simply add an output layer onto
    the tok2vec input model. The tok2vec input model needs to be a model that
    takes a batch of Doc objects (as a list), and returns a list of arrays.
    Each array in the output needs to have one row per token in the doc.
    The actual tok2vec layer is stored as a reference, and only this bit will be
    serialized to file and read back in when calling the 'train' command.
    """
    with nlp.select_pipes(enable=[]):
        nlp.initialize()
    tok2vec = get_tok2vec_ref(nlp, pretrain_config)
    # If the config referred to a Tok2VecListener, grab the original model instead
    if type(tok2vec).__name__ == "Tok2VecListener":
        original_tok2vec = (
            tok2vec.upstream_name if tok2vec.upstream_name != "*" else "tok2vec"
        )
        tok2vec = nlp.get_pipe(original_tok2vec).model
    try:
        tok2vec.initialize(X=[nlp.make_doc("Give it a doc to infer shapes")])
    except ValueError:
        component = pretrain_config["component"]
        layer = pretrain_config["layer"]
        raise ValueError(Errors.E874.format(component=component, layer=layer))

    create_function = pretrain_config["objective"]
    model = create_function(nlp.vocab, tok2vec)
    model.initialize(X=[nlp.make_doc("Give it a doc to infer shapes")])
    set_dropout_rate(model, pretrain_config["dropout"])
    return model


def get_tok2vec_ref(nlp, pretrain_config):
    tok2vec_component = pretrain_config["component"]
    if tok2vec_component is None:
        desc = (
            f"To use pretrained tok2vec weights, [pretraining.component] "
            f"needs to specify the component that should load them."
        )
        err = "component can't be null"
        errors = [{"loc": ["pretraining", "component"], "msg": err}]
        raise ConfigValidationError(
            config=nlp.config["pretraining"], errors=errors, desc=desc
        )
    layer = nlp.get_pipe(tok2vec_component).model
    if pretrain_config["layer"]:
        layer = layer.get_ref(pretrain_config["layer"])
    return layer


class ProgressTracker:
    def __init__(self, frequency=1000000):
        self.loss = 0.0
        self.prev_loss = 0.0
        self.nr_word = 0
        self.words_per_epoch = Counter()
        self.frequency = frequency
        self.last_time = time.time()
        self.last_update = 0
        self.epoch_loss = 0.0

    def update(self, epoch, loss, docs):
        self.loss += loss
        self.epoch_loss += loss
        words_in_batch = sum(len(doc) for doc in docs)
        self.words_per_epoch[epoch] += words_in_batch
        self.nr_word += words_in_batch
        words_since_update = self.nr_word - self.last_update
        if words_since_update >= self.frequency:
            wps = words_since_update / (time.time() - self.last_time)
            self.last_update = self.nr_word
            self.last_time = time.time()
            loss_per_word = self.loss - self.prev_loss
            status = (
                epoch,
                self.nr_word,
                _smart_round(self.loss, width=10),
                _smart_round(loss_per_word, width=6),
                int(wps),
            )
            self.prev_loss = float(self.loss)
            return status
        else:
            return None


def _smart_round(
    figure: Union[float, int], width: int = 10, max_decimal: int = 4
) -> str:
    """Round large numbers as integers, smaller numbers as decimals."""
    n_digits = len(str(int(figure)))
    n_decimal = width - (n_digits + 1)
    if n_decimal <= 1:
        return str(int(figure))
    else:
        n_decimal = min(n_decimal, max_decimal)
        format_str = "%." + str(n_decimal) + "f"
        return format_str % figure