import ctypes import os from pathlib import Path import pytest from pydantic import ValidationError from thinc.api import ( Config, ConfigValidationError, CupyOps, MPSOps, NumpyOps, Optimizer, get_current_ops, set_current_ops, ) from thinc.compat import has_cupy_gpu, has_torch_mps_gpu from spacy import prefer_gpu, require_cpu, require_gpu, util from spacy.about import __version__ as spacy_version from spacy.lang.en import English from spacy.lang.nl import Dutch from spacy.language import DEFAULT_CONFIG_PATH from spacy.ml._precomputable_affine import ( PrecomputableAffine, _backprop_precomputable_affine_padding, ) from spacy.schemas import ConfigSchemaTraining, TokenPattern, TokenPatternSchema from spacy.training.batchers import minibatch_by_words from spacy.util import ( SimpleFrozenList, dot_to_object, find_available_port, import_file, to_ternary_int, ) from .util import get_random_doc, make_tempdir @pytest.fixture def is_admin(): """Determine if the tests are run as admin or not.""" try: admin = os.getuid() == 0 except AttributeError: admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 return admin @pytest.mark.issue(6207) def test_issue6207(en_tokenizer): doc = en_tokenizer("zero one two three four five six") # Make spans s1 = doc[:4] s2 = doc[3:6] # overlaps with s1 s3 = doc[5:7] # overlaps with s2, not s1 result = util.filter_spans((s1, s2, s3)) assert s1 in result assert s2 not in result assert s3 in result @pytest.mark.issue(6258) def test_issue6258(): """Test that the non-empty constraint pattern field is respected""" # These one is valid TokenPatternSchema(pattern=[TokenPattern()]) # But an empty pattern list should fail to validate # based on the schema's constraint with pytest.raises(ValidationError): TokenPatternSchema(pattern=[]) @pytest.mark.parametrize("text", ["hello/world", "hello world"]) def test_util_ensure_path_succeeds(text): path = util.ensure_path(text) assert isinstance(path, Path) @pytest.mark.parametrize( "package,result", [("numpy", True), ("sfkodskfosdkfpsdpofkspdof", False)] ) def test_util_is_package(package, result): """Test that an installed package via pip is recognised by util.is_package.""" assert util.is_package(package) is result @pytest.mark.parametrize("package", ["thinc"]) def test_util_get_package_path(package): """Test that a Path object is returned for a package name.""" path = util.get_package_path(package) assert isinstance(path, Path) def test_PrecomputableAffine(nO=4, nI=5, nF=3, nP=2): model = PrecomputableAffine(nO=nO, nI=nI, nF=nF, nP=nP).initialize() assert model.get_param("W").shape == (nF, nO, nP, nI) tensor = model.ops.alloc((10, nI)) Y, get_dX = model.begin_update(tensor) assert Y.shape == (tensor.shape[0] + 1, nF, nO, nP) dY = model.ops.alloc((15, nO, nP)) ids = model.ops.alloc((15, nF)) ids[1, 2] = -1 dY[1] = 1 assert not model.has_grad("pad") d_pad = _backprop_precomputable_affine_padding(model, dY, ids) assert d_pad[0, 2, 0, 0] == 1.0 ids.fill(0.0) dY.fill(0.0) dY[0] = 0 ids[1, 2] = 0 ids[1, 1] = -1 ids[1, 0] = -1 dY[1] = 1 ids[2, 0] = -1 dY[2] = 5 d_pad = _backprop_precomputable_affine_padding(model, dY, ids) assert d_pad[0, 0, 0, 0] == 6 assert d_pad[0, 1, 0, 0] == 1 assert d_pad[0, 2, 0, 0] == 0 def test_prefer_gpu(): current_ops = get_current_ops() if has_cupy_gpu: assert prefer_gpu() assert isinstance(get_current_ops(), CupyOps) elif has_torch_mps_gpu: assert prefer_gpu() assert isinstance(get_current_ops(), MPSOps) else: assert not prefer_gpu() set_current_ops(current_ops) def test_require_gpu(): current_ops = get_current_ops() if has_cupy_gpu: require_gpu() assert isinstance(get_current_ops(), CupyOps) elif has_torch_mps_gpu: require_gpu() assert isinstance(get_current_ops(), MPSOps) set_current_ops(current_ops) def test_require_cpu(): current_ops = get_current_ops() require_cpu() assert isinstance(get_current_ops(), NumpyOps) try: import cupy # noqa: F401 require_gpu() assert isinstance(get_current_ops(), CupyOps) except ImportError: pass require_cpu() assert isinstance(get_current_ops(), NumpyOps) set_current_ops(current_ops) def test_ascii_filenames(): """Test that all filenames in the project are ASCII. See: https://twitter.com/_inesmontani/status/1177941471632211968 """ root = Path(__file__).parent.parent for path in root.glob("**/*"): assert all(ord(c) < 128 for c in path.name), path.name def test_load_model_blank_shortcut(): """Test that using a model name like "blank:en" works as a shortcut for spacy.blank("en"). """ nlp = util.load_model("blank:en") assert nlp.lang == "en" assert nlp.pipeline == [] # ImportError for loading an unsupported language with pytest.raises(ImportError): util.load_model("blank:zxx") # ImportError for requesting an invalid language code that isn't registered with pytest.raises(ImportError): util.load_model("blank:fjsfijsdof") @pytest.mark.parametrize( "version,constraint,compatible", [ (spacy_version, spacy_version, True), (spacy_version, f">={spacy_version}", True), ("3.0.0", "2.0.0", False), ("3.2.1", ">=2.0.0", True), ("2.2.10a1", ">=1.0.0,<2.1.1", False), ("3.0.0.dev3", ">=1.2.3,<4.5.6", True), ("n/a", ">=1.2.3,<4.5.6", None), ("1.2.3", "n/a", None), ("n/a", "n/a", None), ], ) def test_is_compatible_version(version, constraint, compatible): assert util.is_compatible_version(version, constraint) is compatible @pytest.mark.parametrize( "constraint,expected", [ ("3.0.0", False), ("==3.0.0", False), (">=2.3.0", True), (">2.0.0", True), ("<=2.0.0", True), (">2.0.0,<3.0.0", False), (">=2.0.0,<3.0.0", False), ("!=1.1,>=1.0,~=1.0", True), ("n/a", None), ], ) def test_is_unconstrained_version(constraint, expected): assert util.is_unconstrained_version(constraint) is expected @pytest.mark.parametrize( "a1,a2,b1,b2,is_match", [ ("3.0.0", "3.0", "3.0.1", "3.0", True), ("3.1.0", "3.1", "3.2.1", "3.2", False), ("xxx", None, "1.2.3.dev0", "1.2", False), ], ) def test_minor_version(a1, a2, b1, b2, is_match): assert util.get_minor_version(a1) == a2 assert util.get_minor_version(b1) == b2 assert util.is_minor_version_match(a1, b1) is is_match assert util.is_minor_version_match(a2, b2) is is_match @pytest.mark.parametrize( "dot_notation,expected", [ ( {"token.pos": True, "token._.xyz": True}, {"token": {"pos": True, "_": {"xyz": True}}}, ), ( {"training.batch_size": 128, "training.optimizer.learn_rate": 0.01}, {"training": {"batch_size": 128, "optimizer": {"learn_rate": 0.01}}}, ), ], ) def test_dot_to_dict(dot_notation, expected): result = util.dot_to_dict(dot_notation) assert result == expected assert util.dict_to_dot(result) == dot_notation def test_set_dot_to_object(): config = {"foo": {"bar": 1, "baz": {"x": "y"}}, "test": {"a": {"b": "c"}}} with pytest.raises(KeyError): util.set_dot_to_object(config, "foo.bar.baz", 100) with pytest.raises(KeyError): util.set_dot_to_object(config, "hello.world", 100) with pytest.raises(KeyError): util.set_dot_to_object(config, "test.a.b.c", 100) util.set_dot_to_object(config, "foo.bar", 100) assert config["foo"]["bar"] == 100 util.set_dot_to_object(config, "foo.baz.x", {"hello": "world"}) assert config["foo"]["baz"]["x"]["hello"] == "world" assert config["test"]["a"]["b"] == "c" util.set_dot_to_object(config, "foo", 123) assert config["foo"] == 123 util.set_dot_to_object(config, "test", "hello") assert dict(config) == {"foo": 123, "test": "hello"} @pytest.mark.parametrize( "doc_sizes, expected_batches", [ ([400, 400, 199], [3]), ([400, 400, 199, 3], [4]), ([400, 400, 199, 3, 200], [3, 2]), ([400, 400, 199, 3, 1], [5]), ([400, 400, 199, 3, 1, 1500], [5]), # 1500 will be discarded ([400, 400, 199, 3, 1, 200], [3, 3]), ([400, 400, 199, 3, 1, 999], [3, 3]), ([400, 400, 199, 3, 1, 999, 999], [3, 2, 1, 1]), ([1, 2, 999], [3]), ([1, 2, 999, 1], [4]), ([1, 200, 999, 1], [2, 2]), ([1, 999, 200, 1], [2, 2]), ], ) def test_util_minibatch(doc_sizes, expected_batches): docs = [get_random_doc(doc_size) for doc_size in doc_sizes] tol = 0.2 batch_size = 1000 batches = list( minibatch_by_words(docs, size=batch_size, tolerance=tol, discard_oversize=True) ) assert [len(batch) for batch in batches] == expected_batches max_size = batch_size + batch_size * tol for batch in batches: assert sum([len(doc) for doc in batch]) < max_size @pytest.mark.parametrize( "doc_sizes, expected_batches", [ ([400, 4000, 199], [1, 2]), ([400, 400, 199, 3000, 200], [1, 4]), ([400, 400, 199, 3, 1, 1500], [1, 5]), ([400, 400, 199, 3000, 2000, 200, 200], [1, 1, 3, 2]), ([1, 2, 9999], [1, 2]), ([2000, 1, 2000, 1, 1, 1, 2000], [1, 1, 1, 4]), ], ) def test_util_minibatch_oversize(doc_sizes, expected_batches): """Test that oversized documents are returned in their own batch""" docs = [get_random_doc(doc_size) for doc_size in doc_sizes] tol = 0.2 batch_size = 1000 batches = list( minibatch_by_words(docs, size=batch_size, tolerance=tol, discard_oversize=False) ) assert [len(batch) for batch in batches] == expected_batches def test_util_dot_section(): cfg_string = """ [nlp] lang = "en" pipeline = ["textcat"] [components] [components.textcat] factory = "textcat" [components.textcat.model] @architectures = "spacy.TextCatBOW.v2" exclusive_classes = true ngram_size = 1 no_output_layer = false """ nlp_config = Config().from_str(cfg_string) en_nlp = util.load_model_from_config(nlp_config, auto_fill=True) default_config = Config().from_disk(DEFAULT_CONFIG_PATH) default_config["nlp"]["lang"] = "nl" nl_nlp = util.load_model_from_config(default_config, auto_fill=True) # Test that creation went OK assert isinstance(en_nlp, English) assert isinstance(nl_nlp, Dutch) assert nl_nlp.pipe_names == [] assert en_nlp.pipe_names == ["textcat"] # not exclusive_classes assert en_nlp.get_pipe("textcat").model.attrs["multi_label"] is False # Test that default values got overwritten assert en_nlp.config["nlp"]["pipeline"] == ["textcat"] assert nl_nlp.config["nlp"]["pipeline"] == [] # default value [] # Test proper functioning of 'dot_to_object' with pytest.raises(KeyError): dot_to_object(en_nlp.config, "nlp.pipeline.tagger") with pytest.raises(KeyError): dot_to_object(en_nlp.config, "nlp.unknownattribute") T = util.registry.resolve(nl_nlp.config["training"], schema=ConfigSchemaTraining) assert isinstance(dot_to_object({"training": T}, "training.optimizer"), Optimizer) def test_simple_frozen_list(): t = SimpleFrozenList(["foo", "bar"]) assert t == ["foo", "bar"] assert t.index("bar") == 1 # okay method with pytest.raises(NotImplementedError): t.append("baz") with pytest.raises(NotImplementedError): t.sort() with pytest.raises(NotImplementedError): t.extend(["baz"]) with pytest.raises(NotImplementedError): t.pop() t = SimpleFrozenList(["foo", "bar"], error="Error!") with pytest.raises(NotImplementedError): t.append("baz") def test_resolve_dot_names(): config = { "training": {"optimizer": {"@optimizers": "Adam.v1"}}, "foo": {"bar": "training.optimizer", "baz": "training.xyz"}, } result = util.resolve_dot_names(config, ["training.optimizer"]) assert isinstance(result[0], Optimizer) with pytest.raises(ConfigValidationError) as e: util.resolve_dot_names(config, ["training.xyz", "training.optimizer"]) errors = e.value.errors assert len(errors) == 1 assert errors[0]["loc"] == ["training", "xyz"] def test_import_code(): code_str = """ from spacy import Language class DummyComponent: def __init__(self, vocab, name): pass def initialize(self, get_examples, *, nlp, dummy_param: int): pass @Language.factory( "dummy_component", ) def make_dummy_component( nlp: Language, name: str ): return DummyComponent(nlp.vocab, name) """ with make_tempdir() as temp_dir: code_path = os.path.join(temp_dir, "code.py") with open(code_path, "w") as fileh: fileh.write(code_str) import_file("python_code", code_path) config = {"initialize": {"components": {"dummy_component": {"dummy_param": 1}}}} nlp = English.from_config(config) nlp.add_pipe("dummy_component") nlp.initialize() def test_to_ternary_int(): assert to_ternary_int(True) == 1 assert to_ternary_int(None) == 0 assert to_ternary_int(False) == -1 assert to_ternary_int(1) == 1 assert to_ternary_int(1.0) == 1 assert to_ternary_int(0) == 0 assert to_ternary_int(0.0) == 0 assert to_ternary_int(-1) == -1 assert to_ternary_int(5) == -1 assert to_ternary_int(-10) == -1 assert to_ternary_int("string") == -1 assert to_ternary_int([0, "string"]) == -1 def test_find_available_port(): host = "0.0.0.0" port = 5000 assert find_available_port(port, host) == port, "Port 5000 isn't free" from wsgiref.simple_server import demo_app, make_server with make_server(host, port, demo_app) as httpd: with pytest.warns(UserWarning, match="already in use"): found_port = find_available_port(port, host, auto_select=True) assert found_port == port + 1, "Didn't find next port"