import warnings
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union

import srsly

from ..errors import Errors, Warnings
from ..language import Language
from ..matcher import Matcher, PhraseMatcher
from ..matcher.levenshtein import levenshtein_compare
from ..scorer import get_ner_prf
from ..tokens import Doc, Span
from ..training import Example
from ..util import SimpleFrozenList, ensure_path, from_disk, registry, to_disk
from .pipe import Pipe

DEFAULT_ENT_ID_SEP = "||"
PatternType = Dict[str, Union[str, List[Dict[str, Any]]]]


@Language.factory(
    "entity_ruler",
    assigns=["doc.ents", "token.ent_type", "token.ent_iob"],
    default_config={
        "phrase_matcher_attr": None,
        "matcher_fuzzy_compare": {"@misc": "spacy.levenshtein_compare.v1"},
        "validate": False,
        "overwrite_ents": False,
        "ent_id_sep": DEFAULT_ENT_ID_SEP,
        "scorer": {"@scorers": "spacy.entity_ruler_scorer.v1"},
    },
    default_score_weights={
        "ents_f": 1.0,
        "ents_p": 0.0,
        "ents_r": 0.0,
        "ents_per_type": None,
    },
)
def make_entity_ruler(
    nlp: Language,
    name: str,
    phrase_matcher_attr: Optional[Union[int, str]],
    matcher_fuzzy_compare: Callable,
    validate: bool,
    overwrite_ents: bool,
    ent_id_sep: str,
    scorer: Optional[Callable],
):
    return EntityRuler(
        nlp,
        name,
        phrase_matcher_attr=phrase_matcher_attr,
        matcher_fuzzy_compare=matcher_fuzzy_compare,
        validate=validate,
        overwrite_ents=overwrite_ents,
        ent_id_sep=ent_id_sep,
        scorer=scorer,
    )


def entity_ruler_score(examples, **kwargs):
    return get_ner_prf(examples)


@registry.scorers("spacy.entity_ruler_scorer.v1")
def make_entity_ruler_scorer():
    return entity_ruler_score


class EntityRuler(Pipe):
    """The EntityRuler lets you add spans to the `Doc.ents` using token-based
    rules or exact phrase matches. It can be combined with the statistical
    `EntityRecognizer` to boost accuracy, or used on its own to implement a
    purely rule-based entity recognition system. After initialization, the
    component is typically added to the pipeline using `nlp.add_pipe`.

    DOCS: https://spacy.io/api/entityruler
    USAGE: https://spacy.io/usage/rule-based-matching#entityruler
    """

    def __init__(
        self,
        nlp: Language,
        name: str = "entity_ruler",
        *,
        phrase_matcher_attr: Optional[Union[int, str]] = None,
        matcher_fuzzy_compare: Callable = levenshtein_compare,
        validate: bool = False,
        overwrite_ents: bool = False,
        ent_id_sep: str = DEFAULT_ENT_ID_SEP,
        patterns: Optional[List[PatternType]] = None,
        scorer: Optional[Callable] = entity_ruler_score,
    ) -> None:
        """Initialize the entity ruler. If patterns are supplied here, they
        need to be a list of dictionaries with a `"label"` and `"pattern"`
        key. A pattern can either be a token pattern (list) or a phrase pattern
        (string). For example: `{'label': 'ORG', 'pattern': 'Apple'}`.

        nlp (Language): The shared nlp object to pass the vocab to the matchers
            and process phrase patterns.
        name (str): Instance name of the current pipeline component. Typically
            passed in automatically from the factory when the component is
            added. Used to disable the current entity ruler while creating
            phrase patterns with the nlp object.
        phrase_matcher_attr (int / str): Token attribute to match on, passed
            to the internal PhraseMatcher as `attr`.
        matcher_fuzzy_compare (Callable): The fuzzy comparison method for the
            internal Matcher. Defaults to
            spacy.matcher.levenshtein.levenshtein_compare.
        validate (bool): Whether patterns should be validated, passed to
            Matcher and PhraseMatcher as `validate`
        patterns (iterable): Optional patterns to load in.
        overwrite_ents (bool): If existing entities are present, e.g. entities
            added by the model, overwrite them by matches if necessary.
        ent_id_sep (str): Separator used internally for entity IDs.
        scorer (Optional[Callable]): The scoring method. Defaults to
            spacy.scorer.get_ner_prf.

        DOCS: https://spacy.io/api/entityruler#init
        """
        self.nlp = nlp
        self.name = name
        self.overwrite = overwrite_ents
        self.token_patterns = defaultdict(list)  # type: ignore
        self.phrase_patterns = defaultdict(list)  # type: ignore
        self._validate = validate
        self.matcher_fuzzy_compare = matcher_fuzzy_compare
        self.matcher = Matcher(
            nlp.vocab, validate=validate, fuzzy_compare=self.matcher_fuzzy_compare
        )
        self.phrase_matcher_attr = phrase_matcher_attr
        self.phrase_matcher = PhraseMatcher(
            nlp.vocab, attr=self.phrase_matcher_attr, validate=validate
        )
        self.ent_id_sep = ent_id_sep
        self._ent_ids = defaultdict(tuple)  # type: ignore
        if patterns is not None:
            self.add_patterns(patterns)
        self.scorer = scorer

    def __len__(self) -> int:
        """The number of all patterns added to the entity ruler."""
        n_token_patterns = sum(len(p) for p in self.token_patterns.values())
        n_phrase_patterns = sum(len(p) for p in self.phrase_patterns.values())
        return n_token_patterns + n_phrase_patterns

    def __contains__(self, label: str) -> bool:
        """Whether a label is present in the patterns."""
        return label in self.token_patterns or label in self.phrase_patterns

    def __call__(self, doc: Doc) -> Doc:
        """Find matches in document and add them as entities.

        doc (Doc): The Doc object in the pipeline.
        RETURNS (Doc): The Doc with added entities, if available.

        DOCS: https://spacy.io/api/entityruler#call
        """
        error_handler = self.get_error_handler()
        try:
            matches = self.match(doc)
            self.set_annotations(doc, matches)
            return doc
        except Exception as e:
            return error_handler(self.name, self, [doc], e)

    def match(self, doc: Doc):
        self._require_patterns()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", message="\\[W036")
            matches = list(self.matcher(doc)) + list(self.phrase_matcher(doc))

        final_matches = set(
            [(m_id, start, end) for m_id, start, end in matches if start != end]
        )
        get_sort_key = lambda m: (m[2] - m[1], -m[1])
        final_matches = sorted(final_matches, key=get_sort_key, reverse=True)
        return final_matches

    def set_annotations(self, doc, matches):
        """Modify the document in place"""
        entities = list(doc.ents)
        new_entities = []
        seen_tokens = set()
        for match_id, start, end in matches:
            if any(t.ent_type for t in doc[start:end]) and not self.overwrite:
                continue
            # check for end - 1 here because boundaries are inclusive
            if start not in seen_tokens and end - 1 not in seen_tokens:
                if match_id in self._ent_ids:
                    label, ent_id = self._ent_ids[match_id]
                    span = Span(doc, start, end, label=label, span_id=ent_id)
                else:
                    span = Span(doc, start, end, label=match_id)
                new_entities.append(span)
                entities = [
                    e for e in entities if not (e.start < end and e.end > start)
                ]
                seen_tokens.update(range(start, end))
        doc.ents = entities + new_entities

    @property
    def labels(self) -> Tuple[str, ...]:
        """All labels present in the match patterns.

        RETURNS (set): The string labels.

        DOCS: https://spacy.io/api/entityruler#labels
        """
        keys = set(self.token_patterns.keys())
        keys.update(self.phrase_patterns.keys())
        all_labels = set()

        for l in keys:
            if self.ent_id_sep in l:
                label, _ = self._split_label(l)
                all_labels.add(label)
            else:
                all_labels.add(l)
        return tuple(sorted(all_labels))

    def initialize(
        self,
        get_examples: Callable[[], Iterable[Example]],
        *,
        nlp: Optional[Language] = None,
        patterns: Optional[Sequence[PatternType]] = None,
    ):
        """Initialize the pipe for training.

        get_examples (Callable[[], Iterable[Example]]): Function that
            returns a representative sample of gold-standard Example objects.
        nlp (Language): The current nlp object the component is part of.
        patterns Optional[Iterable[PatternType]]: The list of patterns.

        DOCS: https://spacy.io/api/entityruler#initialize
        """
        self.clear()
        if patterns:
            self.add_patterns(patterns)  # type: ignore[arg-type]

    @property
    def ent_ids(self) -> Tuple[Optional[str], ...]:
        """All entity ids present in the match patterns `id` properties

        RETURNS (set): The string entity ids.

        DOCS: https://spacy.io/api/entityruler#ent_ids
        """
        keys = set(self.token_patterns.keys())
        keys.update(self.phrase_patterns.keys())
        all_ent_ids = set()

        for l in keys:
            if self.ent_id_sep in l:
                _, ent_id = self._split_label(l)
                all_ent_ids.add(ent_id)
        return tuple(all_ent_ids)

    @property
    def patterns(self) -> List[PatternType]:
        """Get all patterns that were added to the entity ruler.

        RETURNS (list): The original patterns, one dictionary per pattern.

        DOCS: https://spacy.io/api/entityruler#patterns
        """
        all_patterns = []
        for label, patterns in self.token_patterns.items():
            for pattern in patterns:
                ent_label, ent_id = self._split_label(label)
                p = {"label": ent_label, "pattern": pattern}
                if ent_id:
                    p["id"] = ent_id
                all_patterns.append(p)
        for label, patterns in self.phrase_patterns.items():
            for pattern in patterns:
                ent_label, ent_id = self._split_label(label)
                p = {"label": ent_label, "pattern": pattern.text}
                if ent_id:
                    p["id"] = ent_id
                all_patterns.append(p)
        return all_patterns

    def add_patterns(self, patterns: List[PatternType]) -> None:
        """Add patterns to the entity ruler. A pattern can either be a token
        pattern (list of dicts) or a phrase pattern (string). For example:
        {'label': 'ORG', 'pattern': 'Apple'}
        {'label': 'GPE', 'pattern': [{'lower': 'san'}, {'lower': 'francisco'}]}

        patterns (list): The patterns to add.

        DOCS: https://spacy.io/api/entityruler#add_patterns
        """

        # disable the nlp components after this one in case they hadn't been initialized / deserialised yet
        try:
            current_index = -1
            for i, (name, pipe) in enumerate(self.nlp.pipeline):
                if self == pipe:
                    current_index = i
                    break
            subsequent_pipes = [pipe for pipe in self.nlp.pipe_names[current_index:]]
        except ValueError:
            subsequent_pipes = []
        with self.nlp.select_pipes(disable=subsequent_pipes):
            token_patterns = []
            phrase_pattern_labels = []
            phrase_pattern_texts = []
            phrase_pattern_ids = []
            for entry in patterns:
                if isinstance(entry["pattern"], str):
                    phrase_pattern_labels.append(entry["label"])
                    phrase_pattern_texts.append(entry["pattern"])
                    phrase_pattern_ids.append(entry.get("id"))
                elif isinstance(entry["pattern"], list):
                    token_patterns.append(entry)
            phrase_patterns = []
            for label, pattern, ent_id in zip(
                phrase_pattern_labels,
                self.nlp.pipe(phrase_pattern_texts),
                phrase_pattern_ids,
            ):
                phrase_pattern = {"label": label, "pattern": pattern}
                if ent_id:
                    phrase_pattern["id"] = ent_id
                phrase_patterns.append(phrase_pattern)
            for entry in token_patterns + phrase_patterns:  # type: ignore[operator]
                label = entry["label"]  # type: ignore
                if "id" in entry:
                    ent_label = label
                    label = self._create_label(label, entry["id"])
                    key = self.matcher._normalize_key(label)
                    self._ent_ids[key] = (ent_label, entry["id"])
                pattern = entry["pattern"]  # type: ignore
                if isinstance(pattern, Doc):
                    self.phrase_patterns[label].append(pattern)
                    self.phrase_matcher.add(label, [pattern])  # type: ignore
                elif isinstance(pattern, list):
                    self.token_patterns[label].append(pattern)
                    self.matcher.add(label, [pattern])
                else:
                    raise ValueError(Errors.E097.format(pattern=pattern))

    def clear(self) -> None:
        """Reset all patterns."""
        self.token_patterns = defaultdict(list)
        self.phrase_patterns = defaultdict(list)
        self._ent_ids = defaultdict(tuple)
        self.matcher = Matcher(
            self.nlp.vocab,
            validate=self._validate,
            fuzzy_compare=self.matcher_fuzzy_compare,
        )
        self.phrase_matcher = PhraseMatcher(
            self.nlp.vocab, attr=self.phrase_matcher_attr, validate=self._validate
        )

    def remove(self, ent_id: str) -> None:
        """Remove a pattern by its ent_id if a pattern with this ent_id was added before

        ent_id (str): id of the pattern to be removed
        RETURNS: None
        DOCS: https://spacy.io/api/entityruler#remove
        """
        label_id_pairs = [
            (label, eid) for (label, eid) in self._ent_ids.values() if eid == ent_id
        ]
        if not label_id_pairs:
            raise ValueError(
                Errors.E1024.format(attr_type="ID", label=ent_id, component=self.name)
            )
        created_labels = [
            self._create_label(label, eid) for (label, eid) in label_id_pairs
        ]
        # remove the patterns from self.phrase_patterns
        self.phrase_patterns = defaultdict(
            list,
            {
                label: val
                for (label, val) in self.phrase_patterns.items()
                if label not in created_labels
            },
        )
        # remove the patterns from self.token_pattern
        self.token_patterns = defaultdict(
            list,
            {
                label: val
                for (label, val) in self.token_patterns.items()
                if label not in created_labels
            },
        )
        # remove the patterns from self.token_pattern
        for label in created_labels:
            if label in self.phrase_matcher:
                self.phrase_matcher.remove(label)
            else:
                self.matcher.remove(label)

    def _require_patterns(self) -> None:
        """Raise a warning if this component has no patterns defined."""
        if len(self) == 0:
            warnings.warn(Warnings.W036.format(name=self.name))

    def _split_label(self, label: str) -> Tuple[str, Optional[str]]:
        """Split Entity label into ent_label and ent_id if it contains self.ent_id_sep

        label (str): The value of label in a pattern entry
        RETURNS (tuple): ent_label, ent_id
        """
        if self.ent_id_sep in label:
            ent_label, ent_id = label.rsplit(self.ent_id_sep, 1)
        else:
            ent_label = label
            ent_id = None  # type: ignore
        return ent_label, ent_id

    def _create_label(self, label: Any, ent_id: Any) -> str:
        """Join Entity label with ent_id if the pattern has an `id` attribute
        If ent_id is not a string, the label is returned as is.

        label (str): The label to set for ent.label_
        ent_id (str): The label
        RETURNS (str): The ent_label joined with configured `ent_id_sep`
        """
        if isinstance(ent_id, str):
            label = f"{label}{self.ent_id_sep}{ent_id}"
        return label

    def from_bytes(
        self, patterns_bytes: bytes, *, exclude: Iterable[str] = SimpleFrozenList()
    ) -> "EntityRuler":
        """Load the entity ruler from a bytestring.

        patterns_bytes (bytes): The bytestring to load.
        RETURNS (EntityRuler): The loaded entity ruler.

        DOCS: https://spacy.io/api/entityruler#from_bytes
        """
        cfg = srsly.msgpack_loads(patterns_bytes)
        self.clear()
        if isinstance(cfg, dict):
            self.add_patterns(cfg.get("patterns", cfg))
            self.overwrite = cfg.get("overwrite", False)
            self.phrase_matcher_attr = cfg.get("phrase_matcher_attr", None)
            self.phrase_matcher = PhraseMatcher(
                self.nlp.vocab,
                attr=self.phrase_matcher_attr,
            )
            self.ent_id_sep = cfg.get("ent_id_sep", DEFAULT_ENT_ID_SEP)
        else:
            self.add_patterns(cfg)
        return self

    def to_bytes(self, *, exclude: Iterable[str] = SimpleFrozenList()) -> bytes:
        """Serialize the entity ruler patterns to a bytestring.

        RETURNS (bytes): The serialized patterns.

        DOCS: https://spacy.io/api/entityruler#to_bytes
        """
        serial = {
            "overwrite": self.overwrite,
            "ent_id_sep": self.ent_id_sep,
            "phrase_matcher_attr": self.phrase_matcher_attr,
            "patterns": self.patterns,
        }
        return srsly.msgpack_dumps(serial)

    def from_disk(
        self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
    ) -> "EntityRuler":
        """Load the entity ruler from a file. Expects a file containing
        newline-delimited JSON (JSONL) with one entry per line.

        path (str / Path): The JSONL file to load.
        RETURNS (EntityRuler): The loaded entity ruler.

        DOCS: https://spacy.io/api/entityruler#from_disk
        """
        path = ensure_path(path)
        self.clear()
        depr_patterns_path = path.with_suffix(".jsonl")
        if path.suffix == ".jsonl":  # user provides a jsonl
            if path.is_file:
                patterns = srsly.read_jsonl(path)
                self.add_patterns(patterns)
            else:
                raise ValueError(Errors.E1023.format(path=path))
        elif depr_patterns_path.is_file():
            patterns = srsly.read_jsonl(depr_patterns_path)
            self.add_patterns(patterns)
        elif path.is_dir():  # path is a valid directory
            cfg = {}
            deserializers_patterns = {
                "patterns": lambda p: self.add_patterns(
                    srsly.read_jsonl(p.with_suffix(".jsonl"))
                )
            }
            deserializers_cfg = {"cfg": lambda p: cfg.update(srsly.read_json(p))}
            from_disk(path, deserializers_cfg, {})
            self.overwrite = cfg.get("overwrite", False)
            self.phrase_matcher_attr = cfg.get("phrase_matcher_attr")
            self.ent_id_sep = cfg.get("ent_id_sep", DEFAULT_ENT_ID_SEP)

            self.phrase_matcher = PhraseMatcher(
                self.nlp.vocab, attr=self.phrase_matcher_attr
            )
            from_disk(path, deserializers_patterns, {})
        else:  # path is not a valid directory or file
            raise ValueError(Errors.E146.format(path=path))
        return self

    def to_disk(
        self, path: Union[str, Path], *, exclude: Iterable[str] = SimpleFrozenList()
    ) -> None:
        """Save the entity ruler patterns to a directory. The patterns will be
        saved as newline-delimited JSON (JSONL).

        path (str / Path): The JSONL file to save.

        DOCS: https://spacy.io/api/entityruler#to_disk
        """
        path = ensure_path(path)
        cfg = {
            "overwrite": self.overwrite,
            "phrase_matcher_attr": self.phrase_matcher_attr,
            "ent_id_sep": self.ent_id_sep,
        }
        serializers = {
            "patterns": lambda p: srsly.write_jsonl(
                p.with_suffix(".jsonl"), self.patterns
            ),
            "cfg": lambda p: srsly.write_json(p, cfg),
        }
        if path.suffix == ".jsonl":  # user wants to save only JSONL
            srsly.write_jsonl(path, self.patterns)
        else:
            to_disk(path, serializers, {})