import warnings
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

import srsly
from thinc.api import Model

from .. import util
from ..errors import Errors, Warnings
from ..language import Language
from ..lookups import Lookups, load_lookups
from ..scorer import Scorer
from ..tokens import Doc, Token
from ..training import Example
from ..util import SimpleFrozenList, logger, registry
from ..vocab import Vocab
from .pipe import Pipe


@Language.factory(
    "lemmatizer",
    assigns=["token.lemma"],
    default_config={
        "model": None,
        "mode": "lookup",
        "overwrite": False,
        "scorer": {"@scorers": "spacy.lemmatizer_scorer.v1"},
    },
    default_score_weights={"lemma_acc": 1.0},
)
def make_lemmatizer(
    nlp: Language,
    model: Optional[Model],
    name: str,
    mode: str,
    overwrite: bool,
    scorer: Optional[Callable],
):
    return Lemmatizer(
        nlp.vocab, model, name, mode=mode, overwrite=overwrite, scorer=scorer
    )


def lemmatizer_score(examples: Iterable[Example], **kwargs) -> Dict[str, Any]:
    return Scorer.score_token_attr(examples, "lemma", **kwargs)


@registry.scorers("spacy.lemmatizer_scorer.v1")
def make_lemmatizer_scorer():
    return lemmatizer_score


class Lemmatizer(Pipe):
    """
    The Lemmatizer supports simple part-of-speech-sensitive suffix rules and
    lookup tables.

    DOCS: https://spacy.io/api/lemmatizer
    """

    @classmethod
    def get_lookups_config(cls, mode: str) -> Tuple[List[str], List[str]]:
        """Returns the lookups configuration settings for a given mode for use
        in Lemmatizer.load_lookups.

        mode (str): The lemmatizer mode.
        RETURNS (Tuple[List[str], List[str]]): The required and optional
            lookup tables for this mode.
        """
        if mode == "lookup":
            return (["lemma_lookup"], [])
        elif mode == "rule":
            return (["lemma_rules"], ["lemma_exc", "lemma_index"])
        return ([], [])

    def __init__(
        self,
        vocab: Vocab,
        model: Optional[Model],
        name: str = "lemmatizer",
        *,
        mode: str = "lookup",
        overwrite: bool = False,
        scorer: Optional[Callable] = lemmatizer_score,
    ) -> None:
        """Initialize a Lemmatizer.

        vocab (Vocab): The vocab.
        model (Model): A model (not yet implemented).
        name (str): The component name. Defaults to "lemmatizer".
        mode (str): The lemmatizer mode: "lookup", "rule". Defaults to "lookup".
        overwrite (bool): Whether to overwrite existing lemmas. Defaults to
            `False`.
        scorer (Optional[Callable]): The scoring method. Defaults to
            Scorer.score_token_attr for the attribute "lemma".

        DOCS: https://spacy.io/api/lemmatizer#init
        """
        self.vocab = vocab
        self.model = model
        self.name = name
        self._mode = mode
        self.lookups = Lookups()
        self.overwrite = overwrite
        self._validated = False
        if self.mode == "lookup":
            self.lemmatize = self.lookup_lemmatize
        elif self.mode == "rule":
            self.lemmatize = self.rule_lemmatize
        else:
            mode_attr = f"{self.mode}_lemmatize"
            if not hasattr(self, mode_attr):
                raise ValueError(Errors.E1003.format(mode=mode))
            self.lemmatize = getattr(self, mode_attr)
        self.cache = {}  # type: ignore[var-annotated]
        self.scorer = scorer

    @property
    def mode(self):
        return self._mode

    def __call__(self, doc: Doc) -> Doc:
        """Apply the lemmatizer to one document.

        doc (Doc): The Doc to process.
        RETURNS (Doc): The processed Doc.

        DOCS: https://spacy.io/api/lemmatizer#call
        """
        if not self._validated:
            self._validate_tables(Errors.E1004)
        error_handler = self.get_error_handler()
        try:
            for token in doc:
                if self.overwrite or token.lemma == 0:
                    token.lemma_ = self.lemmatize(token)[0]
            return doc
        except Exception as e:
            error_handler(self.name, self, [doc], e)

    def initialize(
        self,
        get_examples: Optional[Callable[[], Iterable[Example]]] = None,
        *,
        nlp: Optional[Language] = None,
        lookups: Optional[Lookups] = None,
    ):
        """Initialize the lemmatizer and load in data.

        get_examples (Callable[[], Iterable[Example]]): Function that
            returns a representative sample of gold-standard Example objects.
        nlp (Language): The current nlp object the component is part of.
        lookups (Lookups): The lookups object containing the (optional) tables
            such as "lemma_rules", "lemma_index", "lemma_exc" and
            "lemma_lookup". Defaults to None.
        """
        required_tables, optional_tables = self.get_lookups_config(self.mode)
        if lookups is None:
            logger.debug(
                "Lemmatizer: no lemmatizer lookups tables provided, "
                "trying to load tables from registered lookups (usually "
                "spacy-lookups-data)"
            )
            lookups = load_lookups(
                lang=self.vocab.lang, tables=required_tables, strict=False
            )
            missing_tables = set(required_tables) - set(lookups.tables)
            if len(missing_tables) > 0:
                raise ValueError(
                    Errors.E4010.format(
                        missing_tables=list(missing_tables),
                        pipe_name=self.name,
                        required_tables=srsly.json_dumps(required_tables),
                        tables=srsly.json_dumps(required_tables + optional_tables),
                    )
                )
            optional_lookups = load_lookups(
                lang=self.vocab.lang, tables=optional_tables, strict=False
            )
            for table in optional_lookups.tables:
                lookups.set_table(table, optional_lookups.get_table(table))
        self.lookups = lookups
        self._validate_tables(Errors.E1004)

    def _validate_tables(self, error_message: str = Errors.E912) -> None:
        """Check that the lookups are correct for the current mode."""
        required_tables, optional_tables = self.get_lookups_config(self.mode)
        for table in required_tables:
            if table not in self.lookups:
                raise ValueError(
                    error_message.format(
                        mode=self.mode,
                        tables=required_tables,
                        found=self.lookups.tables,
                    )
                )
        self._validated = True

    def lookup_lemmatize(self, token: Token) -> List[str]:
        """Lemmatize using a lookup-based approach.

        token (Token): The token to lemmatize.
        RETURNS (list): The available lemmas for the string.

        DOCS: https://spacy.io/api/lemmatizer#lookup_lemmatize
        """
        lookup_table = self.lookups.get_table("lemma_lookup", {})
        result = lookup_table.get(token.text, token.text)
        if isinstance(result, str):
            result = [result]
        return result

    def rule_lemmatize(self, token: Token) -> List[str]:
        """Lemmatize using a rule-based approach.

        token (Token): The token to lemmatize.
        RETURNS (list): The available lemmas for the string.

        DOCS: https://spacy.io/api/lemmatizer#rule_lemmatize
        """
        cache_key = (token.orth, token.pos, token.morph.key)  # type: ignore[attr-defined]
        if cache_key in self.cache:
            return self.cache[cache_key]
        string = token.text
        univ_pos = token.pos_.lower()
        if univ_pos in ("", "eol", "space"):
            if univ_pos == "":
                warnings.warn(Warnings.W108)
            return [string.lower()]
        # See Issue #435 for example of where this logic is requied.
        if self.is_base_form(token):
            return [string.lower()]
        index_table = self.lookups.get_table("lemma_index", {})
        exc_table = self.lookups.get_table("lemma_exc", {})
        rules_table = self.lookups.get_table("lemma_rules", {})
        if not any(
            (
                index_table.get(univ_pos),
                exc_table.get(univ_pos),
                rules_table.get(univ_pos),
            )
        ):
            if univ_pos == "propn":
                return [string]
            else:
                return [string.lower()]

        index = index_table.get(univ_pos, {})
        exceptions = exc_table.get(univ_pos, {})
        rules = rules_table.get(univ_pos, {})
        orig = string
        string = string.lower()
        forms = []
        oov_forms = []
        for old, new in rules:
            if string.endswith(old):
                form = string[: len(string) - len(old)] + new
                if not form:
                    pass
                elif form in index or not form.isalpha():
                    forms.append(form)
                else:
                    oov_forms.append(form)
        # Remove duplicates but preserve the ordering of applied "rules"
        forms = list(dict.fromkeys(forms))
        # Put exceptions at the front of the list, so they get priority.
        # This is a dodgy heuristic -- but it's the best we can do until we get
        # frequencies on this. We can at least prune out problematic exceptions,
        # if they shadow more frequent analyses.
        for form in exceptions.get(string, []):
            if form not in forms:
                forms.insert(0, form)
        if not forms:
            forms.extend(oov_forms)
        if not forms:
            forms.append(orig)
        self.cache[cache_key] = forms
        return forms

    def is_base_form(self, token: Token) -> bool:
        """Check whether the token is a base form that does not need further
        analysis for lemmatization.

        token (Token): The token.
        RETURNS (bool): Whether the token is a base form.

        DOCS: https://spacy.io/api/lemmatizer#is_base_form
        """
        return False

    def to_disk(
        self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
    ):
        """Serialize the pipe to disk.

        path (str / Path): Path to a directory.
        exclude (Iterable[str]): String names of serialization fields to exclude.

        DOCS: https://spacy.io/api/lemmatizer#to_disk
        """
        serialize = {}
        serialize["vocab"] = lambda p: self.vocab.to_disk(p, exclude=exclude)
        serialize["lookups"] = lambda p: self.lookups.to_disk(p)
        util.to_disk(path, serialize, exclude)

    def from_disk(
        self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
    ) -> "Lemmatizer":
        """Load the pipe from disk. Modifies the object in place and returns it.

        path (str / Path): Path to a directory.
        exclude (Iterable[str]): String names of serialization fields to exclude.
        RETURNS (Lemmatizer): The modified Lemmatizer object.

        DOCS: https://spacy.io/api/lemmatizer#from_disk
        """
        deserialize: Dict[str, Callable[[Any], Any]] = {}
        deserialize["vocab"] = lambda p: self.vocab.from_disk(p, exclude=exclude)
        deserialize["lookups"] = lambda p: self.lookups.from_disk(p)
        util.from_disk(path, deserialize, exclude)
        self._validate_tables()
        return self

    def to_bytes(self, *, exclude: Iterable[str] = SimpleFrozenList()) -> bytes:
        """Serialize the pipe to a bytestring.

        exclude (Iterable[str]): String names of serialization fields to exclude.
        RETURNS (bytes): The serialized object.

        DOCS: https://spacy.io/api/lemmatizer#to_bytes
        """
        serialize = {}
        serialize["vocab"] = lambda: self.vocab.to_bytes(exclude=exclude)
        serialize["lookups"] = self.lookups.to_bytes
        return util.to_bytes(serialize, exclude)

    def from_bytes(
        self, bytes_data: bytes, *, exclude: Iterable[str] = SimpleFrozenList()
    ) -> "Lemmatizer":
        """Load the pipe from a bytestring.

        bytes_data (bytes): The serialized pipe.
        exclude (Iterable[str]): String names of serialization fields to exclude.
        RETURNS (Lemmatizer): The loaded Lemmatizer.

        DOCS: https://spacy.io/api/lemmatizer#from_bytes
        """
        deserialize: Dict[str, Callable[[Any], Any]] = {}
        deserialize["vocab"] = lambda b: self.vocab.from_bytes(b, exclude=exclude)
        deserialize["lookups"] = lambda b: self.lookups.from_bytes(b)
        util.from_bytes(bytes_data, deserialize, exclude)
        self._validate_tables()
        return self