from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast

import numpy
from thinc.api import Config, Model, Ops, Optimizer, get_current_ops, set_dropout_rate
from thinc.types import Floats2d, Ints1d, Ints2d, Ragged

from ..compat import Protocol, runtime_checkable
from ..errors import Errors
from ..language import Language
from ..scorer import Scorer
from ..tokens import Doc, Span, SpanGroup
from ..training import Example, validate_examples
from ..util import registry
from ..vocab import Vocab
from .trainable_pipe import TrainablePipe

spancat_default_config = """
[model]
@architectures = "spacy.SpanCategorizer.v1"
scorer = {"@layers": "spacy.LinearLogistic.v1"}

[model.reducer]
@layers = spacy.mean_max_reducer.v1
hidden_size = 128

[model.tok2vec]
@architectures = "spacy.Tok2Vec.v2"

[model.tok2vec.embed]
@architectures = "spacy.MultiHashEmbed.v2"
width = 96
rows = [5000, 1000, 2500, 1000]
attrs = ["NORM", "PREFIX", "SUFFIX", "SHAPE"]
include_static_vectors = false

[model.tok2vec.encode]
@architectures = "spacy.MaxoutWindowEncoder.v2"
width = ${model.tok2vec.embed.width}
window_size = 1
maxout_pieces = 3
depth = 4
"""

spancat_singlelabel_default_config = """
[model]
@architectures = "spacy.SpanCategorizer.v1"
scorer = {"@layers": "Softmax.v2"}

[model.reducer]
@layers = spacy.mean_max_reducer.v1
hidden_size = 128

[model.tok2vec]
@architectures = "spacy.Tok2Vec.v2"
[model.tok2vec.embed]
@architectures = "spacy.MultiHashEmbed.v1"
width = 96
rows = [5000, 1000, 2500, 1000]
attrs = ["NORM", "PREFIX", "SUFFIX", "SHAPE"]
include_static_vectors = false

[model.tok2vec.encode]
@architectures = "spacy.MaxoutWindowEncoder.v2"
width = ${model.tok2vec.embed.width}
window_size = 1
maxout_pieces = 3
depth = 4
"""

DEFAULT_SPANS_KEY = "sc"
DEFAULT_SPANCAT_MODEL = Config().from_str(spancat_default_config)["model"]
DEFAULT_SPANCAT_SINGLELABEL_MODEL = Config().from_str(
    spancat_singlelabel_default_config
)["model"]


@runtime_checkable
class Suggester(Protocol):
    def __call__(self, docs: Iterable[Doc], *, ops: Optional[Ops] = None) -> Ragged:
        ...


def ngram_suggester(
    docs: Iterable[Doc], sizes: List[int], *, ops: Optional[Ops] = None
) -> Ragged:
    if ops is None:
        ops = get_current_ops()
    spans = []
    lengths = []
    for doc in docs:
        starts = ops.xp.arange(len(doc), dtype="i")
        starts = starts.reshape((-1, 1))
        length = 0
        for size in sizes:
            if size <= len(doc):
                starts_size = starts[: len(doc) - (size - 1)]
                spans.append(ops.xp.hstack((starts_size, starts_size + size)))
                length += spans[-1].shape[0]
            if spans:
                assert spans[-1].ndim == 2, spans[-1].shape
        lengths.append(length)
    lengths_array = ops.asarray1i(lengths)
    if len(spans) > 0:
        output = Ragged(ops.xp.vstack(spans), lengths_array)
    else:
        output = Ragged(ops.xp.zeros((0, 0), dtype="i"), lengths_array)

    assert output.dataXd.ndim == 2
    return output


def preset_spans_suggester(
    docs: Iterable[Doc], spans_key: str, *, ops: Optional[Ops] = None
) -> Ragged:
    if ops is None:
        ops = get_current_ops()
    spans = []
    lengths = []
    for doc in docs:
        length = 0
        if doc.spans[spans_key]:
            for span in doc.spans[spans_key]:
                spans.append([span.start, span.end])
                length += 1

        lengths.append(length)
    lengths_array = cast(Ints1d, ops.asarray(lengths, dtype="i"))
    if len(spans) > 0:
        output = Ragged(ops.asarray(spans, dtype="i"), lengths_array)
    else:
        output = Ragged(ops.xp.zeros((0, 0), dtype="i"), lengths_array)
    return output


@registry.misc("spacy.ngram_suggester.v1")
def build_ngram_suggester(sizes: List[int]) -> Suggester:
    """Suggest all spans of the given lengths. Spans are returned as a ragged
    array of integers. The array has two columns, indicating the start and end
    position."""

    return partial(ngram_suggester, sizes=sizes)


@registry.misc("spacy.ngram_range_suggester.v1")
def build_ngram_range_suggester(min_size: int, max_size: int) -> Suggester:
    """Suggest all spans of the given lengths between a given min and max value - both inclusive.
    Spans are returned as a ragged array of integers. The array has two columns,
    indicating the start and end position."""
    sizes = list(range(min_size, max_size + 1))
    return build_ngram_suggester(sizes)


@registry.misc("spacy.preset_spans_suggester.v1")
def build_preset_spans_suggester(spans_key: str) -> Suggester:
    """Suggest all spans that are already stored in doc.spans[spans_key].
    This is useful when an upstream component is used to set the spans
    on the Doc such as a SpanRuler or SpanFinder."""
    return partial(preset_spans_suggester, spans_key=spans_key)


@Language.factory(
    "spancat",
    assigns=["doc.spans"],
    default_config={
        "threshold": 0.5,
        "spans_key": DEFAULT_SPANS_KEY,
        "max_positive": None,
        "model": DEFAULT_SPANCAT_MODEL,
        "suggester": {"@misc": "spacy.ngram_suggester.v1", "sizes": [1, 2, 3]},
        "scorer": {"@scorers": "spacy.spancat_scorer.v1"},
    },
    default_score_weights={"spans_sc_f": 1.0, "spans_sc_p": 0.0, "spans_sc_r": 0.0},
)
def make_spancat(
    nlp: Language,
    name: str,
    suggester: Suggester,
    model: Model[Tuple[List[Doc], Ragged], Floats2d],
    spans_key: str,
    scorer: Optional[Callable],
    threshold: float,
    max_positive: Optional[int],
) -> "SpanCategorizer":
    """Create a SpanCategorizer component and configure it for multi-label
    classification to be able to assign multiple labels for each span.
    The span categorizer consists of two
    parts: a suggester function that proposes candidate spans, and a labeller
    model that predicts one or more labels for each span.

    name (str): The component instance name, used to add entries to the
        losses during training.
    suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
        Spans are returned as a ragged array with two integer columns, for the
        start and end positions.
    model (Model[Tuple[List[Doc], Ragged], Floats2d]): A model instance that
        is given a list of documents and (start, end) indices representing
        candidate span offsets. The model predicts a probability for each category
        for each span.
    spans_key (str): Key of the doc.spans dict to save the spans under. During
        initialization and training, the component will look for spans on the
        reference document under the same key.
    scorer (Optional[Callable]): The scoring method. Defaults to
        Scorer.score_spans for the Doc.spans[spans_key] with overlapping
        spans allowed.
    threshold (float): Minimum probability to consider a prediction positive.
        Spans with a positive prediction will be saved on the Doc. Defaults to
        0.5.
    max_positive (Optional[int]): Maximum number of labels to consider positive
        per span. Defaults to None, indicating no limit.
    """
    return SpanCategorizer(
        nlp.vocab,
        model=model,
        suggester=suggester,
        name=name,
        spans_key=spans_key,
        negative_weight=None,
        allow_overlap=True,
        max_positive=max_positive,
        threshold=threshold,
        scorer=scorer,
        add_negative_label=False,
    )


@Language.factory(
    "spancat_singlelabel",
    assigns=["doc.spans"],
    default_config={
        "spans_key": DEFAULT_SPANS_KEY,
        "model": DEFAULT_SPANCAT_SINGLELABEL_MODEL,
        "negative_weight": 1.0,
        "suggester": {"@misc": "spacy.ngram_suggester.v1", "sizes": [1, 2, 3]},
        "scorer": {"@scorers": "spacy.spancat_scorer.v1"},
        "allow_overlap": True,
    },
    default_score_weights={"spans_sc_f": 1.0, "spans_sc_p": 0.0, "spans_sc_r": 0.0},
)
def make_spancat_singlelabel(
    nlp: Language,
    name: str,
    suggester: Suggester,
    model: Model[Tuple[List[Doc], Ragged], Floats2d],
    spans_key: str,
    negative_weight: float,
    allow_overlap: bool,
    scorer: Optional[Callable],
) -> "SpanCategorizer":
    """Create a SpanCategorizer component and configure it for multi-class
    classification. With this configuration each span can get at most one
    label. The span categorizer consists of two
    parts: a suggester function that proposes candidate spans, and a labeller
    model that predicts one or more labels for each span.

    name (str): The component instance name, used to add entries to the
        losses during training.
    suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
        Spans are returned as a ragged array with two integer columns, for the
        start and end positions.
    model (Model[Tuple[List[Doc], Ragged], Floats2d]): A model instance that
        is given a list of documents and (start, end) indices representing
        candidate span offsets. The model predicts a probability for each category
        for each span.
    spans_key (str): Key of the doc.spans dict to save the spans under. During
        initialization and training, the component will look for spans on the
        reference document under the same key.
    scorer (Optional[Callable]): The scoring method. Defaults to
        Scorer.score_spans for the Doc.spans[spans_key] with overlapping
        spans allowed.
    negative_weight (float): Multiplier for the loss terms.
        Can be used to downweight the negative samples if there are too many.
    allow_overlap (bool): If True the data is assumed to contain overlapping spans.
        Otherwise it produces non-overlapping spans greedily prioritizing
        higher assigned label scores.
    """
    return SpanCategorizer(
        nlp.vocab,
        model=model,
        suggester=suggester,
        name=name,
        spans_key=spans_key,
        negative_weight=negative_weight,
        allow_overlap=allow_overlap,
        max_positive=1,
        add_negative_label=True,
        threshold=None,
        scorer=scorer,
    )


def spancat_score(examples: Iterable[Example], **kwargs) -> Dict[str, Any]:
    kwargs = dict(kwargs)
    attr_prefix = "spans_"
    key = kwargs["spans_key"]
    kwargs.setdefault("attr", f"{attr_prefix}{key}")
    kwargs.setdefault("allow_overlap", True)
    kwargs.setdefault(
        "getter", lambda doc, key: doc.spans.get(key[len(attr_prefix) :], [])
    )
    kwargs.setdefault("has_annotation", lambda doc: key in doc.spans)
    return Scorer.score_spans(examples, **kwargs)


@registry.scorers("spacy.spancat_scorer.v1")
def make_spancat_scorer():
    return spancat_score


@dataclass
class _Intervals:
    """
    Helper class to avoid storing overlapping spans.
    """

    def __init__(self):
        self.ranges = set()

    def add(self, i, j):
        for e in range(i, j):
            self.ranges.add(e)

    def __contains__(self, rang):
        i, j = rang
        for e in range(i, j):
            if e in self.ranges:
                return True
        return False


class SpanCategorizer(TrainablePipe):
    """Pipeline component to label spans of text.

    DOCS: https://spacy.io/api/spancategorizer
    """

    def __init__(
        self,
        vocab: Vocab,
        model: Model[Tuple[List[Doc], Ragged], Floats2d],
        suggester: Suggester,
        name: str = "spancat",
        *,
        add_negative_label: bool = False,
        spans_key: str = "spans",
        negative_weight: Optional[float] = 1.0,
        allow_overlap: Optional[bool] = True,
        max_positive: Optional[int] = None,
        threshold: Optional[float] = 0.5,
        scorer: Optional[Callable] = spancat_score,
    ) -> None:
        """Initialize the multi-label or multi-class span categorizer.

        vocab (Vocab): The shared vocabulary.
        model (thinc.api.Model): The Thinc Model powering the pipeline component.
            For multi-class classification (single label per span) we recommend
            using a Softmax classifier as a the final layer, while for multi-label
            classification (multiple possible labels per span) we recommend Logistic.
        suggester (Callable[[Iterable[Doc], Optional[Ops]], Ragged]): A function that suggests spans.
            Spans are returned as a ragged array with two integer columns, for the
            start and end positions.
        name (str): The component instance name, used to add entries to the
            losses during training.
        spans_key (str): Key of the Doc.spans dict to save the spans under.
            During initialization and training, the component will look for
            spans on the reference document under the same key. Defaults to
            `"spans"`.
        add_negative_label (bool): Learn to predict a special 'negative_label'
            when a Span is not annotated.
        threshold (Optional[float]): Minimum probability to consider a prediction
            positive. Defaults to 0.5. Spans with a positive prediction will be saved
            on the Doc.
        max_positive (Optional[int]): Maximum number of labels to consider
            positive per span. Defaults to None, indicating no limit.
        negative_weight (float): Multiplier for the loss terms.
            Can be used to downweight the negative samples if there are too many
            when add_negative_label is True. Otherwise its unused.
        allow_overlap (bool): If True the data is assumed to contain overlapping spans.
            Otherwise it produces non-overlapping spans greedily prioritizing
            higher assigned label scores. Only used when max_positive is 1.
        scorer (Optional[Callable]): The scoring method. Defaults to
            Scorer.score_spans for the Doc.spans[spans_key] with overlapping
            spans allowed.

        DOCS: https://spacy.io/api/spancategorizer#init
        """
        self.cfg = {
            "labels": [],
            "spans_key": spans_key,
            "threshold": threshold,
            "max_positive": max_positive,
            "negative_weight": negative_weight,
            "allow_overlap": allow_overlap,
        }
        self.vocab = vocab
        self.suggester = suggester
        self.model = model
        self.name = name
        self.scorer = scorer
        self.add_negative_label = add_negative_label
        if not allow_overlap and max_positive is not None and max_positive > 1:
            raise ValueError(Errors.E1051.format(max_positive=max_positive))

    @property
    def key(self) -> str:
        """Key of the doc.spans dict to save the spans under. During
        initialization and training, the component will look for spans on the
        reference document under the same key.
        """
        return str(self.cfg["spans_key"])

    def _allow_extra_label(self) -> None:
        """Raise an error if the component can not add any more labels."""
        nO = None
        if self.model.has_dim("nO"):
            nO = self.model.get_dim("nO")
        elif self.model.has_ref("output_layer") and self.model.get_ref(
            "output_layer"
        ).has_dim("nO"):
            nO = self.model.get_ref("output_layer").get_dim("nO")
        if nO is not None and nO == self._n_labels:
            if not self.is_resizable:
                raise ValueError(
                    Errors.E922.format(name=self.name, nO=self.model.get_dim("nO"))
                )

    def add_label(self, label: str) -> int:
        """Add a new label to the pipe.

        label (str): The label to add.
        RETURNS (int): 0 if label is already present, otherwise 1.

        DOCS: https://spacy.io/api/spancategorizer#add_label
        """
        if not isinstance(label, str):
            raise ValueError(Errors.E187)
        if label in self.labels:
            return 0
        self._allow_extra_label()
        self.cfg["labels"].append(label)  # type: ignore
        self.vocab.strings.add(label)
        return 1

    @property
    def labels(self) -> Tuple[str]:
        """RETURNS (Tuple[str]): The labels currently added to the component.

        DOCS: https://spacy.io/api/spancategorizer#labels
        """
        return tuple(self.cfg["labels"])  # type: ignore

    @property
    def label_data(self) -> List[str]:
        """RETURNS (List[str]): Information about the component's labels.

        DOCS: https://spacy.io/api/spancategorizer#label_data
        """
        return list(self.labels)

    @property
    def _label_map(self) -> Dict[str, int]:
        """RETURNS (Dict[str, int]): The label map."""
        return {label: i for i, label in enumerate(self.labels)}

    @property
    def _n_labels(self) -> int:
        """RETURNS (int): Number of labels."""
        if self.add_negative_label:
            return len(self.labels) + 1
        else:
            return len(self.labels)

    @property
    def _negative_label_i(self) -> Union[int, None]:
        """RETURNS (Union[int, None]): Index of the negative label."""
        if self.add_negative_label:
            return len(self.label_data)
        else:
            return None

    def predict(self, docs: Iterable[Doc]):
        """Apply the pipeline's model to a batch of docs, without modifying them.

        docs (Iterable[Doc]): The documents to predict.
        RETURNS: The models prediction for each document.

        DOCS: https://spacy.io/api/spancategorizer#predict
        """
        indices = self.suggester(docs, ops=self.model.ops)
        if indices.lengths.sum() == 0:
            scores = self.model.ops.alloc2f(0, 0)
        else:
            scores = self.model.predict((docs, indices))  # type: ignore
        return indices, scores

    def set_candidates(
        self, docs: Iterable[Doc], *, candidates_key: str = "candidates"
    ) -> None:
        """Use the spancat suggester to add a list of span candidates to a list of docs.
        This method is intended to be used for debugging purposes.

        docs (Iterable[Doc]): The documents to modify.
        candidates_key (str): Key of the Doc.spans dict to save the candidate spans under.

        DOCS: https://spacy.io/api/spancategorizer#set_candidates
        """
        suggester_output = self.suggester(docs, ops=self.model.ops)

        for candidates, doc in zip(suggester_output, docs):  # type: ignore
            doc.spans[candidates_key] = []
            for index in candidates.dataXd:
                doc.spans[candidates_key].append(doc[index[0] : index[1]])

    def set_annotations(self, docs: Iterable[Doc], indices_scores) -> None:
        """Modify a batch of Doc objects, using pre-computed scores.

        docs (Iterable[Doc]): The documents to modify.
        scores: The scores to set, produced by SpanCategorizer.predict.

        DOCS: https://spacy.io/api/spancategorizer#set_annotations
        """
        indices, scores = indices_scores
        offset = 0
        for i, doc in enumerate(docs):
            indices_i = indices[i].dataXd
            allow_overlap = cast(bool, self.cfg["allow_overlap"])
            if self.cfg["max_positive"] == 1:
                doc.spans[self.key] = self._make_span_group_singlelabel(
                    doc,
                    indices_i,
                    scores[offset : offset + indices.lengths[i]],
                    allow_overlap,
                )
            else:
                doc.spans[self.key] = self._make_span_group_multilabel(
                    doc,
                    indices_i,
                    scores[offset : offset + indices.lengths[i]],
                )
            offset += indices.lengths[i]

    def update(
        self,
        examples: Iterable[Example],
        *,
        drop: float = 0.0,
        sgd: Optional[Optimizer] = None,
        losses: Optional[Dict[str, float]] = None,
    ) -> Dict[str, float]:
        """Learn from a batch of documents and gold-standard information,
        updating the pipe's model. Delegates to predict and get_loss.

        examples (Iterable[Example]): A batch of Example objects.
        drop (float): The dropout rate.
        sgd (thinc.api.Optimizer): The optimizer.
        losses (Dict[str, float]): Optional record of the loss during training.
            Updated using the component name as the key.
        RETURNS (Dict[str, float]): The updated losses dictionary.

        DOCS: https://spacy.io/api/spancategorizer#update
        """
        if losses is None:
            losses = {}
        losses.setdefault(self.name, 0.0)
        validate_examples(examples, "SpanCategorizer.update")
        self._validate_categories(examples)
        if not any(len(eg.predicted) if eg.predicted else 0 for eg in examples):
            # Handle cases where there are no tokens in any docs.
            return losses
        docs = [eg.predicted for eg in examples]
        spans = self.suggester(docs, ops=self.model.ops)
        if spans.lengths.sum() == 0:
            return losses
        set_dropout_rate(self.model, drop)
        scores, backprop_scores = self.model.begin_update((docs, spans))
        loss, d_scores = self.get_loss(examples, (spans, scores))
        backprop_scores(d_scores)  # type: ignore
        if sgd is not None:
            self.finish_update(sgd)
        losses[self.name] += loss
        return losses

    def get_loss(
        self, examples: Iterable[Example], spans_scores: Tuple[Ragged, Floats2d]
    ) -> Tuple[float, float]:
        """Find the loss and gradient of loss for the batch of documents and
        their predicted scores.

        examples (Iterable[Examples]): The batch of examples.
        spans_scores: Scores representing the model's predictions.
        RETURNS (Tuple[float, float]): The loss and the gradient.

        DOCS: https://spacy.io/api/spancategorizer#get_loss
        """
        spans, scores = spans_scores
        spans = Ragged(
            self.model.ops.to_numpy(spans.data), self.model.ops.to_numpy(spans.lengths)
        )
        target = numpy.zeros(scores.shape, dtype=scores.dtype)
        if self.add_negative_label:
            negative_spans = numpy.ones((scores.shape[0]))
        offset = 0
        label_map = self._label_map
        for i, eg in enumerate(examples):
            # Map (start, end) offset of spans to the row in the d_scores array,
            # so that we can adjust the gradient for predictions that were
            # in the gold standard.
            spans_index = {}
            spans_i = spans[i].dataXd
            for j in range(spans.lengths[i]):
                start = int(spans_i[j, 0])  # type: ignore
                end = int(spans_i[j, 1])  # type: ignore
                spans_index[(start, end)] = offset + j
            for gold_span in self._get_aligned_spans(eg):
                key = (gold_span.start, gold_span.end)
                if key in spans_index:
                    row = spans_index[key]
                    k = label_map[gold_span.label_]
                    target[row, k] = 1.0
                    if self.add_negative_label:
                        # delete negative label target.
                        negative_spans[row] = 0.0
            # The target is a flat array for all docs. Track the position
            # we're at within the flat array.
            offset += spans.lengths[i]
        target = self.model.ops.asarray(target, dtype="f")  # type: ignore
        if self.add_negative_label:
            negative_samples = numpy.nonzero(negative_spans)[0]
            target[negative_samples, self._negative_label_i] = 1.0  # type: ignore
        # The target will have the values 0 (for untrue predictions) or 1
        # (for true predictions).
        # The scores should be in the range [0, 1].
        # If the prediction is 0.9 and it's true, the gradient
        # will be -0.1 (0.9 - 1.0).
        # If the prediction is 0.9 and it's false, the gradient will be
        # 0.9 (0.9 - 0.0)
        d_scores = scores - target
        if self.add_negative_label:
            neg_weight = cast(float, self.cfg["negative_weight"])
            if neg_weight != 1.0:
                d_scores[negative_samples] *= neg_weight
        loss = float((d_scores**2).sum())
        return loss, d_scores

    def initialize(
        self,
        get_examples: Callable[[], Iterable[Example]],
        *,
        nlp: Optional[Language] = None,
        labels: Optional[List[str]] = None,
    ) -> None:
        """Initialize the pipe for training, using a representative set
        of data examples.

        get_examples (Callable[[], Iterable[Example]]): Function that
            returns a representative sample of gold-standard Example objects.
        nlp (Optional[Language]): The current nlp object the component is part of.
        labels (Optional[List[str]]): The labels to add to the component, typically generated by the
            `init labels` command. If no labels are provided, the get_examples
            callback is used to extract the labels from the data.

        DOCS: https://spacy.io/api/spancategorizer#initialize
        """
        subbatch: List[Example] = []
        if labels is not None:
            for label in labels:
                self.add_label(label)
        for eg in get_examples():
            if labels is None:
                for span in eg.reference.spans.get(self.key, []):
                    self.add_label(span.label_)
            if len(subbatch) < 10:
                subbatch.append(eg)
        self._require_labels()
        if subbatch:
            docs = [eg.x for eg in subbatch]
            spans = build_ngram_suggester(sizes=[1])(docs)
            Y = self.model.ops.alloc2f(spans.dataXd.shape[0], self._n_labels)
            self.model.initialize(X=(docs, spans), Y=Y)
        else:
            self.model.initialize()

    def _validate_categories(self, examples: Iterable[Example]):
        # TODO
        pass

    def _get_aligned_spans(self, eg: Example):
        return eg.get_aligned_spans_y2x(
            eg.reference.spans.get(self.key, []), allow_overlap=True
        )

    def _make_span_group_multilabel(
        self,
        doc: Doc,
        indices: Ints2d,
        scores: Floats2d,
    ) -> SpanGroup:
        """Find the top-k labels for each span (k=max_positive)."""
        spans = SpanGroup(doc, name=self.key)
        if scores.size == 0:
            return spans
        scores = self.model.ops.to_numpy(scores)
        indices = self.model.ops.to_numpy(indices)
        threshold = self.cfg["threshold"]
        max_positive = self.cfg["max_positive"]

        keeps = scores >= threshold
        if max_positive is not None:
            assert isinstance(max_positive, int)
            if self.add_negative_label:
                negative_scores = numpy.copy(scores[:, self._negative_label_i])
                scores[:, self._negative_label_i] = -numpy.inf
                ranked = (scores * -1).argsort()  # type: ignore
                scores[:, self._negative_label_i] = negative_scores
            else:
                ranked = (scores * -1).argsort()  # type: ignore
            span_filter = ranked[:, max_positive:]
            for i, row in enumerate(span_filter):
                keeps[i, row] = False

        attrs_scores = []
        for i in range(indices.shape[0]):
            start = indices[i, 0]
            end = indices[i, 1]
            for j, keep in enumerate(keeps[i]):
                if keep:
                    if j != self._negative_label_i:
                        spans.append(Span(doc, start, end, label=self.labels[j]))
                        attrs_scores.append(scores[i, j])
        spans.attrs["scores"] = numpy.array(attrs_scores)
        return spans

    def _make_span_group_singlelabel(
        self,
        doc: Doc,
        indices: Ints2d,
        scores: Floats2d,
        allow_overlap: bool = True,
    ) -> SpanGroup:
        """Find the argmax label for each span."""
        # Handle cases when there are zero suggestions
        if scores.size == 0:
            return SpanGroup(doc, name=self.key)
        scores = self.model.ops.to_numpy(scores)
        indices = self.model.ops.to_numpy(indices)
        predicted = scores.argmax(axis=1)
        argmax_scores = numpy.take_along_axis(
            scores, numpy.expand_dims(predicted, 1), axis=1
        )
        keeps = numpy.ones(predicted.shape, dtype=bool)
        # Remove samples where the negative label is the argmax.
        if self.add_negative_label:
            keeps = numpy.logical_and(keeps, predicted != self._negative_label_i)
        # Filter samples according to threshold.
        threshold = self.cfg["threshold"]
        if threshold is not None:
            keeps = numpy.logical_and(keeps, (argmax_scores >= threshold).squeeze())
        # Sort spans according to argmax probability
        if not allow_overlap:
            # Get the probabilities
            sort_idx = (argmax_scores.squeeze() * -1).argsort()
            argmax_scores = argmax_scores[sort_idx]
            predicted = predicted[sort_idx]
            indices = indices[sort_idx]
            keeps = keeps[sort_idx]
        seen = _Intervals()
        spans = SpanGroup(doc, name=self.key)
        attrs_scores = []
        for i in range(indices.shape[0]):
            if not keeps[i]:
                continue

            label = predicted[i]
            start = indices[i, 0]
            end = indices[i, 1]

            if not allow_overlap:
                if (start, end) in seen:
                    continue
                else:
                    seen.add(start, end)
            attrs_scores.append(argmax_scores[i])
            spans.append(Span(doc, start, end, label=self.labels[label]))

        spans.attrs["scores"] = numpy.array(attrs_scores)
        return spans