spaCy/spacy/tests/test_misc.py
2023-07-19 16:37:31 +02:00

460 lines
14 KiB
Python

import ctypes
import os
from pathlib import Path
import pytest
from pydantic import ValidationError
from thinc.api import (
Config,
ConfigValidationError,
CupyOps,
MPSOps,
NumpyOps,
Optimizer,
get_current_ops,
set_current_ops,
)
from thinc.compat import has_cupy_gpu, has_torch_mps_gpu
from spacy import prefer_gpu, require_cpu, require_gpu, util
from spacy.about import __version__ as spacy_version
from spacy.lang.en import English
from spacy.lang.nl import Dutch
from spacy.language import DEFAULT_CONFIG_PATH
from spacy.schemas import ConfigSchemaTraining, TokenPattern, TokenPatternSchema
from spacy.training.batchers import minibatch_by_words
from spacy.util import (
SimpleFrozenList,
dot_to_object,
find_available_port,
import_file,
to_ternary_int,
)
from .util import get_random_doc, make_tempdir
@pytest.fixture
def is_admin():
"""Determine if the tests are run as admin or not."""
try:
admin = os.getuid() == 0
except AttributeError:
admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
return admin
@pytest.mark.issue(6207)
def test_issue6207(en_tokenizer):
doc = en_tokenizer("zero one two three four five six")
# Make spans
s1 = doc[:4]
s2 = doc[3:6] # overlaps with s1
s3 = doc[5:7] # overlaps with s2, not s1
result = util.filter_spans((s1, s2, s3))
assert s1 in result
assert s2 not in result
assert s3 in result
@pytest.mark.issue(6258)
def test_issue6258():
"""Test that the non-empty constraint pattern field is respected"""
# These one is valid
TokenPatternSchema(pattern=[TokenPattern()])
# But an empty pattern list should fail to validate
# based on the schema's constraint
with pytest.raises(ValidationError):
TokenPatternSchema(pattern=[])
@pytest.mark.parametrize("text", ["hello/world", "hello world"])
def test_util_ensure_path_succeeds(text):
path = util.ensure_path(text)
assert isinstance(path, Path)
@pytest.mark.parametrize(
"package,result", [("numpy", True), ("sfkodskfosdkfpsdpofkspdof", False)]
)
def test_util_is_package(package, result):
"""Test that an installed package via pip is recognised by util.is_package."""
assert util.is_package(package) is result
@pytest.mark.parametrize("package", ["thinc"])
def test_util_get_package_path(package):
"""Test that a Path object is returned for a package name."""
path = util.get_package_path(package)
assert isinstance(path, Path)
def test_prefer_gpu():
current_ops = get_current_ops()
if has_cupy_gpu:
assert prefer_gpu()
assert isinstance(get_current_ops(), CupyOps)
elif has_torch_mps_gpu:
assert prefer_gpu()
assert isinstance(get_current_ops(), MPSOps)
else:
assert not prefer_gpu()
set_current_ops(current_ops)
def test_require_gpu():
current_ops = get_current_ops()
if has_cupy_gpu:
require_gpu()
assert isinstance(get_current_ops(), CupyOps)
elif has_torch_mps_gpu:
require_gpu()
assert isinstance(get_current_ops(), MPSOps)
set_current_ops(current_ops)
def test_require_cpu():
current_ops = get_current_ops()
require_cpu()
assert isinstance(get_current_ops(), NumpyOps)
try:
import cupy # noqa: F401
require_gpu()
assert isinstance(get_current_ops(), CupyOps)
except ImportError:
pass
require_cpu()
assert isinstance(get_current_ops(), NumpyOps)
set_current_ops(current_ops)
def test_ascii_filenames():
"""Test that all filenames in the project are ASCII.
See: https://twitter.com/_inesmontani/status/1177941471632211968
"""
root = Path(__file__).parent.parent
for path in root.glob("**/*"):
assert all(ord(c) < 128 for c in path.name), path.name
def test_load_model_blank_shortcut():
"""Test that using a model name like "blank:en" works as a shortcut for
spacy.blank("en").
"""
nlp = util.load_model("blank:en")
assert nlp.lang == "en"
assert nlp.pipeline == []
# ImportError for loading an unsupported language
with pytest.raises(ImportError):
util.load_model("blank:zxx")
# ImportError for requesting an invalid language code that isn't registered
with pytest.raises(ImportError):
util.load_model("blank:fjsfijsdof")
@pytest.mark.parametrize(
"version,constraint,compatible",
[
(spacy_version, spacy_version, True),
(spacy_version, f">={spacy_version}", True),
("3.0.0", "2.0.0", False),
("3.2.1", ">=2.0.0", True),
("2.2.10a1", ">=1.0.0,<2.1.1", False),
("3.0.0.dev3", ">=1.2.3,<4.5.6", True),
("n/a", ">=1.2.3,<4.5.6", None),
("1.2.3", "n/a", None),
("n/a", "n/a", None),
],
)
def test_is_compatible_version(version, constraint, compatible):
assert util.is_compatible_version(version, constraint) is compatible
@pytest.mark.parametrize(
"constraint,expected",
[
("3.0.0", False),
("==3.0.0", False),
(">=2.3.0", True),
(">2.0.0", True),
("<=2.0.0", True),
(">2.0.0,<3.0.0", False),
(">=2.0.0,<3.0.0", False),
("!=1.1,>=1.0,~=1.0", True),
("n/a", None),
],
)
def test_is_unconstrained_version(constraint, expected):
assert util.is_unconstrained_version(constraint) is expected
@pytest.mark.parametrize(
"a1,a2,b1,b2,is_match",
[
("3.0.0", "3.0", "3.0.1", "3.0", True),
("3.1.0", "3.1", "3.2.1", "3.2", False),
("xxx", None, "1.2.3.dev0", "1.2", False),
],
)
def test_minor_version(a1, a2, b1, b2, is_match):
assert util.get_minor_version(a1) == a2
assert util.get_minor_version(b1) == b2
assert util.is_minor_version_match(a1, b1) is is_match
assert util.is_minor_version_match(a2, b2) is is_match
@pytest.mark.parametrize(
"dot_notation,expected",
[
(
{"token.pos": True, "token._.xyz": True},
{"token": {"pos": True, "_": {"xyz": True}}},
),
(
{"training.batch_size": 128, "training.optimizer.learn_rate": 0.01},
{"training": {"batch_size": 128, "optimizer": {"learn_rate": 0.01}}},
),
(
{"attribute_ruler.scorer.@scorers": "spacy.tagger_scorer.v1"},
{"attribute_ruler": {"scorer": {"@scorers": "spacy.tagger_scorer.v1"}}},
),
],
)
def test_dot_to_dict(dot_notation, expected):
result = util.dot_to_dict(dot_notation)
assert result == expected
assert util.dict_to_dot(result) == dot_notation
@pytest.mark.parametrize(
"dot_notation,expected",
[
(
{"token.pos": True, "token._.xyz": True},
{"token": {"pos": True, "_": {"xyz": True}}},
),
(
{"training.batch_size": 128, "training.optimizer.learn_rate": 0.01},
{"training": {"batch_size": 128, "optimizer": {"learn_rate": 0.01}}},
),
(
{"attribute_ruler.scorer": {"@scorers": "spacy.tagger_scorer.v1"}},
{"attribute_ruler": {"scorer": {"@scorers": "spacy.tagger_scorer.v1"}}},
),
],
)
def test_dot_to_dict_overrides(dot_notation, expected):
result = util.dot_to_dict(dot_notation)
assert result == expected
assert util.dict_to_dot(result, for_overrides=True) == dot_notation
def test_set_dot_to_object():
config = {"foo": {"bar": 1, "baz": {"x": "y"}}, "test": {"a": {"b": "c"}}}
with pytest.raises(KeyError):
util.set_dot_to_object(config, "foo.bar.baz", 100)
with pytest.raises(KeyError):
util.set_dot_to_object(config, "hello.world", 100)
with pytest.raises(KeyError):
util.set_dot_to_object(config, "test.a.b.c", 100)
util.set_dot_to_object(config, "foo.bar", 100)
assert config["foo"]["bar"] == 100
util.set_dot_to_object(config, "foo.baz.x", {"hello": "world"})
assert config["foo"]["baz"]["x"]["hello"] == "world"
assert config["test"]["a"]["b"] == "c"
util.set_dot_to_object(config, "foo", 123)
assert config["foo"] == 123
util.set_dot_to_object(config, "test", "hello")
assert dict(config) == {"foo": 123, "test": "hello"}
@pytest.mark.parametrize(
"doc_sizes, expected_batches",
[
([400, 400, 199], [3]),
([400, 400, 199, 3], [4]),
([400, 400, 199, 3, 200], [3, 2]),
([400, 400, 199, 3, 1], [5]),
([400, 400, 199, 3, 1, 1500], [5]), # 1500 will be discarded
([400, 400, 199, 3, 1, 200], [3, 3]),
([400, 400, 199, 3, 1, 999], [3, 3]),
([400, 400, 199, 3, 1, 999, 999], [3, 2, 1, 1]),
([1, 2, 999], [3]),
([1, 2, 999, 1], [4]),
([1, 200, 999, 1], [2, 2]),
([1, 999, 200, 1], [2, 2]),
],
)
def test_util_minibatch(doc_sizes, expected_batches):
docs = [get_random_doc(doc_size) for doc_size in doc_sizes]
tol = 0.2
batch_size = 1000
batches = list(
minibatch_by_words(docs, size=batch_size, tolerance=tol, discard_oversize=True)
)
assert [len(batch) for batch in batches] == expected_batches
max_size = batch_size + batch_size * tol
for batch in batches:
assert sum([len(doc) for doc in batch]) < max_size
@pytest.mark.parametrize(
"doc_sizes, expected_batches",
[
([400, 4000, 199], [1, 2]),
([400, 400, 199, 3000, 200], [1, 4]),
([400, 400, 199, 3, 1, 1500], [1, 5]),
([400, 400, 199, 3000, 2000, 200, 200], [1, 1, 3, 2]),
([1, 2, 9999], [1, 2]),
([2000, 1, 2000, 1, 1, 1, 2000], [1, 1, 1, 4]),
],
)
def test_util_minibatch_oversize(doc_sizes, expected_batches):
"""Test that oversized documents are returned in their own batch"""
docs = [get_random_doc(doc_size) for doc_size in doc_sizes]
tol = 0.2
batch_size = 1000
batches = list(
minibatch_by_words(docs, size=batch_size, tolerance=tol, discard_oversize=False)
)
assert [len(batch) for batch in batches] == expected_batches
def test_util_dot_section():
cfg_string = """
[nlp]
lang = "en"
pipeline = ["textcat"]
[components]
[components.textcat]
factory = "textcat"
[components.textcat.model]
@architectures = "spacy.TextCatBOW.v2"
exclusive_classes = true
ngram_size = 1
no_output_layer = false
"""
nlp_config = Config().from_str(cfg_string)
en_nlp = util.load_model_from_config(nlp_config, auto_fill=True)
default_config = Config().from_disk(DEFAULT_CONFIG_PATH)
default_config["nlp"]["lang"] = "nl"
nl_nlp = util.load_model_from_config(default_config, auto_fill=True)
# Test that creation went OK
assert isinstance(en_nlp, English)
assert isinstance(nl_nlp, Dutch)
assert nl_nlp.pipe_names == []
assert en_nlp.pipe_names == ["textcat"]
# not exclusive_classes
assert en_nlp.get_pipe("textcat").model.attrs["multi_label"] is False
# Test that default values got overwritten
assert en_nlp.config["nlp"]["pipeline"] == ["textcat"]
assert nl_nlp.config["nlp"]["pipeline"] == [] # default value []
# Test proper functioning of 'dot_to_object'
with pytest.raises(KeyError):
dot_to_object(en_nlp.config, "nlp.pipeline.tagger")
with pytest.raises(KeyError):
dot_to_object(en_nlp.config, "nlp.unknownattribute")
T = util.registry.resolve(nl_nlp.config["training"], schema=ConfigSchemaTraining)
assert isinstance(dot_to_object({"training": T}, "training.optimizer"), Optimizer)
def test_simple_frozen_list():
t = SimpleFrozenList(["foo", "bar"])
assert t == ["foo", "bar"]
assert t.index("bar") == 1 # okay method
with pytest.raises(NotImplementedError):
t.append("baz")
with pytest.raises(NotImplementedError):
t.sort()
with pytest.raises(NotImplementedError):
t.extend(["baz"])
with pytest.raises(NotImplementedError):
t.pop()
t = SimpleFrozenList(["foo", "bar"], error="Error!")
with pytest.raises(NotImplementedError):
t.append("baz")
def test_resolve_dot_names():
config = {
"training": {"optimizer": {"@optimizers": "Adam.v1"}},
"foo": {"bar": "training.optimizer", "baz": "training.xyz"},
}
result = util.resolve_dot_names(config, ["training.optimizer"])
assert isinstance(result[0], Optimizer)
with pytest.raises(ConfigValidationError) as e:
util.resolve_dot_names(config, ["training.xyz", "training.optimizer"])
errors = e.value.errors
assert len(errors) == 1
assert errors[0]["loc"] == ["training", "xyz"]
def test_import_code():
code_str = """
from spacy import Language
class DummyComponent:
def __init__(self, vocab, name):
pass
def initialize(self, get_examples, *, nlp, dummy_param: int):
pass
@Language.factory(
"dummy_component",
)
def make_dummy_component(
nlp: Language, name: str
):
return DummyComponent(nlp.vocab, name)
"""
with make_tempdir() as temp_dir:
code_path = os.path.join(temp_dir, "code.py")
with open(code_path, "w") as fileh:
fileh.write(code_str)
import_file("python_code", code_path)
config = {"initialize": {"components": {"dummy_component": {"dummy_param": 1}}}}
nlp = English.from_config(config)
nlp.add_pipe("dummy_component")
nlp.initialize()
def test_to_ternary_int():
assert to_ternary_int(True) == 1
assert to_ternary_int(None) == 0
assert to_ternary_int(False) == -1
assert to_ternary_int(1) == 1
assert to_ternary_int(1.0) == 1
assert to_ternary_int(0) == 0
assert to_ternary_int(0.0) == 0
assert to_ternary_int(-1) == -1
assert to_ternary_int(5) == -1
assert to_ternary_int(-10) == -1
assert to_ternary_int("string") == -1
assert to_ternary_int([0, "string"]) == -1
def test_find_available_port():
host = "0.0.0.0"
port = 5000
assert find_available_port(port, host) == port, "Port 5000 isn't free"
from wsgiref.simple_server import demo_app, make_server
with make_server(host, port, demo_app) as httpd:
with pytest.warns(UserWarning, match="already in use"):
found_port = find_available_port(port, host, auto_select=True)
assert found_port == port + 1, "Didn't find next port"