import os
import sys
from pathlib import Path

import pytest
import srsly
from typer.testing import CliRunner

from spacy.cli._util import app, get_git_version
from spacy.tokens import Doc, DocBin, Span

from .util import make_tempdir, normalize_whitespace


def has_git():
    try:
        get_git_version()
        return True
    except RuntimeError:
        return False


def test_convert_auto():
    with make_tempdir() as d_in, make_tempdir() as d_out:
        for f in ["data1.iob", "data2.iob", "data3.iob"]:
            Path(d_in / f).touch()

        # ensure that "automatic" suffix detection works
        result = CliRunner().invoke(app, ["convert", str(d_in), str(d_out)])
        assert "Generated output file" in result.stdout
        out_files = os.listdir(d_out)
        assert len(out_files) == 3
        assert "data1.spacy" in out_files
        assert "data2.spacy" in out_files
        assert "data3.spacy" in out_files


def test_convert_auto_conflict():
    with make_tempdir() as d_in, make_tempdir() as d_out:
        for f in ["data1.iob", "data2.iob", "data3.json"]:
            Path(d_in / f).touch()

        # ensure that "automatic" suffix detection warns when there are different file types
        result = CliRunner().invoke(app, ["convert", str(d_in), str(d_out)])
        assert "All input files must be same type" in result.stdout
        out_files = os.listdir(d_out)
        assert len(out_files) == 0


def test_benchmark_accuracy_alias():
    # Verify that the `evaluate` alias works correctly.
    result_benchmark = CliRunner().invoke(app, ["benchmark", "accuracy", "--help"])
    result_evaluate = CliRunner().invoke(app, ["evaluate", "--help"])
    assert normalize_whitespace(result_benchmark.stdout) == normalize_whitespace(
        result_evaluate.stdout.replace("spacy evaluate", "spacy benchmark accuracy")
    )


def test_debug_data_trainable_lemmatizer_cli(en_vocab):
    train_docs = [
        Doc(en_vocab, words=["I", "like", "cats"], lemmas=["I", "like", "cat"]),
        Doc(
            en_vocab,
            words=["Dogs", "are", "great", "too"],
            lemmas=["dog", "be", "great", "too"],
        ),
    ]
    dev_docs = [
        Doc(en_vocab, words=["Cats", "are", "cute"], lemmas=["cat", "be", "cute"]),
        Doc(en_vocab, words=["Pets", "are", "great"], lemmas=["pet", "be", "great"]),
    ]
    with make_tempdir() as d_in:
        train_bin = DocBin(docs=train_docs)
        train_bin.to_disk(d_in / "train.spacy")
        dev_bin = DocBin(docs=dev_docs)
        dev_bin.to_disk(d_in / "dev.spacy")
        # `debug data` requires an input pipeline config
        CliRunner().invoke(
            app,
            [
                "init",
                "config",
                f"{d_in}/config.cfg",
                "--lang",
                "en",
                "--pipeline",
                "trainable_lemmatizer",
            ],
        )
        result_debug_data = CliRunner().invoke(
            app,
            [
                "debug",
                "data",
                f"{d_in}/config.cfg",
                "--paths.train",
                f"{d_in}/train.spacy",
                "--paths.dev",
                f"{d_in}/dev.spacy",
            ],
        )
        # Instead of checking specific wording of the output, which may change,
        # we'll check that this section of the debug output is present.
        assert "= Trainable Lemmatizer =" in result_debug_data.stdout


# project tests

CFG_FILE = "myconfig.cfg"

SAMPLE_PROJECT = {
    "title": "Sample project",
    "description": "This is a project for testing",
    "assets": [
        {
            "dest": "assets/spacy-readme.md",
            "url": "https://github.com/explosion/spaCy/raw/dec81508d28b47f09a06203c472b37f00db6c869/README.md",
            "checksum": "411b2c89ccf34288fae8ed126bf652f7",
        },
        {
            "dest": "assets/citation.cff",
            "url": "https://github.com/explosion/spaCy/raw/master/CITATION.cff",
            "checksum": "c996bfd80202d480eb2e592369714e5e",
            "extra": True,
        },
    ],
    "commands": [
        {
            "name": "ok",
            "help": "print ok",
            "script": ["python -c \"print('okokok')\""],
        },
        {
            "name": "create",
            "help": "make a file",
            "script": [f"python -m spacy init config {CFG_FILE}"],
            "outputs": [f"{CFG_FILE}"],
        },
    ],
}

SAMPLE_PROJECT_TEXT = srsly.yaml_dumps(SAMPLE_PROJECT)


@pytest.fixture
def project_dir():
    with make_tempdir() as pdir:
        (pdir / "project.yml").write_text(SAMPLE_PROJECT_TEXT)
        yield pdir


def test_project_document(project_dir):
    readme_path = project_dir / "README.md"
    assert not readme_path.exists(), "README already exists"
    result = CliRunner().invoke(
        app, ["project", "document", str(project_dir), "-o", str(readme_path)]
    )
    assert result.exit_code == 0
    assert readme_path.is_file()
    text = readme_path.read_text("utf-8")
    assert SAMPLE_PROJECT["description"] in text


def test_project_assets(project_dir):
    asset_dir = project_dir / "assets"
    assert not asset_dir.exists(), "Assets dir is already present"
    result = CliRunner().invoke(app, ["project", "assets", str(project_dir)])
    assert result.exit_code == 0
    assert (asset_dir / "spacy-readme.md").is_file(), "Assets not downloaded"
    # check that extras work
    result = CliRunner().invoke(app, ["project", "assets", "--extra", str(project_dir)])
    assert result.exit_code == 0
    assert (asset_dir / "citation.cff").is_file(), "Extras not downloaded"


def test_project_run(project_dir):
    # make sure dry run works
    test_file = project_dir / CFG_FILE
    result = CliRunner().invoke(
        app, ["project", "run", "--dry", "create", str(project_dir)]
    )
    assert result.exit_code == 0
    assert not test_file.is_file()
    result = CliRunner().invoke(app, ["project", "run", "create", str(project_dir)])
    assert result.exit_code == 0
    assert test_file.is_file()
    result = CliRunner().invoke(app, ["project", "run", "ok", str(project_dir)])
    assert result.exit_code == 0
    assert "okokok" in result.stdout


@pytest.mark.skipif(not has_git(), reason="git not installed")
@pytest.mark.parametrize(
    "options",
    [
        "",
        # "--sparse",
        "--branch v3",
        "--repo https://github.com/explosion/projects --branch v3",
    ],
)
def test_project_clone(options):
    with make_tempdir() as workspace:
        out = workspace / "project"
        target = "benchmarks/ner_conll03"
        if not options:
            options = []
        else:
            options = options.split()
        result = CliRunner().invoke(
            app, ["project", "clone", target, *options, str(out)]
        )
        assert result.exit_code == 0
        assert (out / "README.md").is_file()


def test_project_push_pull(project_dir):
    proj = dict(SAMPLE_PROJECT)
    remote = "xyz"

    with make_tempdir() as remote_dir:
        proj["remotes"] = {remote: str(remote_dir)}
        proj_text = srsly.yaml_dumps(proj)
        (project_dir / "project.yml").write_text(proj_text)

        test_file = project_dir / CFG_FILE
        result = CliRunner().invoke(app, ["project", "run", "create", str(project_dir)])
        assert result.exit_code == 0
        assert test_file.is_file()
        result = CliRunner().invoke(app, ["project", "push", remote, str(project_dir)])
        assert result.exit_code == 0
        test_file.unlink()
        assert not test_file.exists()
        result = CliRunner().invoke(app, ["project", "pull", remote, str(project_dir)])
        assert result.exit_code == 0
        assert test_file.is_file()


def test_find_function_valid():
    # example of architecture in main code base
    function = "spacy.TextCatBOW.v3"
    result = CliRunner().invoke(app, ["find-function", function, "-r", "architectures"])
    assert f"Found registered function '{function}'" in result.stdout
    assert "textcat.py" in result.stdout

    result = CliRunner().invoke(app, ["find-function", function])
    assert f"Found registered function '{function}'" in result.stdout
    assert "textcat.py" in result.stdout

    # example of architecture in spacy-legacy
    function = "spacy.TextCatBOW.v1"
    result = CliRunner().invoke(app, ["find-function", function])
    assert f"Found registered function '{function}'" in result.stdout
    assert "spacy_legacy" in result.stdout
    assert "textcat.py" in result.stdout


def test_find_function_invalid():
    # invalid registry
    function = "spacy.TextCatBOW.v3"
    registry = "foobar"
    result = CliRunner().invoke(
        app, ["find-function", function, "--registry", registry]
    )
    assert f"Unknown function registry: '{registry}'" in result.stdout

    # invalid function
    function = "spacy.TextCatBOW.v666"
    result = CliRunner().invoke(app, ["find-function", function])
    assert f"Couldn't find registered function: '{function}'" in result.stdout


example_words_1 = ["I", "like", "cats"]
example_words_2 = ["I", "like", "dogs"]
example_lemmas_1 = ["I", "like", "cat"]
example_lemmas_2 = ["I", "like", "dog"]
example_tags = ["PRP", "VBP", "NNS"]
example_morphs = [
    "Case=Nom|Number=Sing|Person=1|PronType=Prs",
    "Tense=Pres|VerbForm=Fin",
    "Number=Plur",
]
example_deps = ["nsubj", "ROOT", "dobj"]
example_pos = ["PRON", "VERB", "NOUN"]
example_ents = ["O", "O", "I-ANIMAL"]
example_spans = [(2, 3, "ANIMAL")]

TRAIN_EXAMPLE_1 = dict(
    words=example_words_1,
    lemmas=example_lemmas_1,
    tags=example_tags,
    morphs=example_morphs,
    deps=example_deps,
    heads=[1, 1, 1],
    pos=example_pos,
    ents=example_ents,
    spans=example_spans,
    cats={"CAT": 1.0, "DOG": 0.0},
)
TRAIN_EXAMPLE_2 = dict(
    words=example_words_2,
    lemmas=example_lemmas_2,
    tags=example_tags,
    morphs=example_morphs,
    deps=example_deps,
    heads=[1, 1, 1],
    pos=example_pos,
    ents=example_ents,
    spans=example_spans,
    cats={"CAT": 0.0, "DOG": 1.0},
)


@pytest.mark.slow
@pytest.mark.parametrize(
    "component,examples",
    [
        ("tagger", [TRAIN_EXAMPLE_1, TRAIN_EXAMPLE_2]),
        ("morphologizer", [TRAIN_EXAMPLE_1, TRAIN_EXAMPLE_2]),
        ("trainable_lemmatizer", [TRAIN_EXAMPLE_1, TRAIN_EXAMPLE_2]),
        ("parser", [TRAIN_EXAMPLE_1] * 30),
        ("ner", [TRAIN_EXAMPLE_1, TRAIN_EXAMPLE_2]),
        ("spancat", [TRAIN_EXAMPLE_1, TRAIN_EXAMPLE_2]),
        ("textcat", [TRAIN_EXAMPLE_1, TRAIN_EXAMPLE_2]),
    ],
)
def test_init_config_trainable(component, examples, en_vocab):
    if component == "textcat":
        train_docs = []
        for example in examples:
            doc = Doc(en_vocab, words=example["words"])
            doc.cats = example["cats"]
            train_docs.append(doc)
    elif component == "spancat":
        train_docs = []
        for example in examples:
            doc = Doc(en_vocab, words=example["words"])
            doc.spans["sc"] = [
                Span(doc, start, end, label) for start, end, label in example["spans"]
            ]
            train_docs.append(doc)
    else:
        train_docs = []
        for example in examples:
            # cats, spans are not valid kwargs for instantiating a Doc
            example = {k: v for k, v in example.items() if k not in ("cats", "spans")}
            doc = Doc(en_vocab, **example)
            train_docs.append(doc)

    with make_tempdir() as d_in:
        train_bin = DocBin(docs=train_docs)
        train_bin.to_disk(d_in / "train.spacy")
        dev_bin = DocBin(docs=train_docs)
        dev_bin.to_disk(d_in / "dev.spacy")
        init_config_result = CliRunner().invoke(
            app,
            [
                "init",
                "config",
                f"{d_in}/config.cfg",
                "--lang",
                "en",
                "--pipeline",
                component,
            ],
        )
        assert init_config_result.exit_code == 0
        train_result = CliRunner().invoke(
            app,
            [
                "train",
                f"{d_in}/config.cfg",
                "--paths.train",
                f"{d_in}/train.spacy",
                "--paths.dev",
                f"{d_in}/dev.spacy",
                "--output",
                f"{d_in}/model",
            ],
        )
        assert train_result.exit_code == 0
        assert Path(d_in / "model" / "model-last").exists()


@pytest.mark.slow
@pytest.mark.parametrize(
    "component,examples",
    [("tagger,parser,morphologizer", [TRAIN_EXAMPLE_1, TRAIN_EXAMPLE_2] * 15)],
)
def test_init_config_trainable_multiple(component, examples, en_vocab):
    train_docs = []
    for example in examples:
        example = {k: v for k, v in example.items() if k not in ("cats", "spans")}
        doc = Doc(en_vocab, **example)
        train_docs.append(doc)

    with make_tempdir() as d_in:
        train_bin = DocBin(docs=train_docs)
        train_bin.to_disk(d_in / "train.spacy")
        dev_bin = DocBin(docs=train_docs)
        dev_bin.to_disk(d_in / "dev.spacy")
        init_config_result = CliRunner().invoke(
            app,
            [
                "init",
                "config",
                f"{d_in}/config.cfg",
                "--lang",
                "en",
                "--pipeline",
                component,
            ],
        )
        assert init_config_result.exit_code == 0
        train_result = CliRunner().invoke(
            app,
            [
                "train",
                f"{d_in}/config.cfg",
                "--paths.train",
                f"{d_in}/train.spacy",
                "--paths.dev",
                f"{d_in}/dev.spacy",
                "--output",
                f"{d_in}/model",
            ],
        )
        assert train_result.exit_code == 0
        assert Path(d_in / "model" / "model-last").exists()