Address issues with source with component names and replacing listeners (#12701)

When sourcing a component, the object from the original pipeline is added to the new pipeline as the same object. This creates a situation where there are several attributes that cannot be in sync between the original pipeline and the new pipeline at the same time for this one object:

* component.name
* component.listener_map / component.listening_components for tok2vec and transformer

When running replace_listeners on a component, the config is not updated correctly if the state of the component is incorrect for the current pipeline (in particular changes that should be applied from model.attrs["replace_listener_cfg"] as used in spacy-transformers) due to the fact that:

* find_listeners relies on component.name to set the name in the listener_map
* replace_listeners relies on listener_map to determine how to modify the configs

In addition, there are several places where pipeline components are modified and the listener map and/or internal component names aren't currently updated.

In cases where there is a component shared by two pipelines that cannot be in sync, this PR chooses to prioritize the most recently modified or initialized pipeline. There is no actual solution with the current source behavior that will make both pipelines usable, so the current pipeline is updated whenever components are added/renamed/removed or the pipeline is initialized for training.
This commit is contained in:
Adriane Boyd 2023-06-27 10:47:07 +02:00
parent 0b190c5d42
commit 1f663b7c33
3 changed files with 105 additions and 33 deletions

View File

@ -716,6 +716,11 @@ class Language:
) )
) )
pipe = source.get_pipe(source_name) pipe = source.get_pipe(source_name)
# There is no actual solution here. Either the component has the right
# name for the source pipeline or the component has the right name for
# the current pipeline. This prioritizes the current pipeline.
if hasattr(pipe, "name"):
pipe.name = name
# Make sure the source config is interpolated so we don't end up with # Make sure the source config is interpolated so we don't end up with
# orphaned variables in our final config # orphaned variables in our final config
source_config = source.config.interpolate() source_config = source.config.interpolate()
@ -793,6 +798,7 @@ class Language:
pipe_index = self._get_pipe_index(before, after, first, last) pipe_index = self._get_pipe_index(before, after, first, last)
self._pipe_meta[name] = self.get_factory_meta(factory_name) self._pipe_meta[name] = self.get_factory_meta(factory_name)
self._components.insert(pipe_index, (name, pipe_component)) self._components.insert(pipe_index, (name, pipe_component))
self._link_components()
return pipe_component return pipe_component
def _get_pipe_index( def _get_pipe_index(
@ -928,6 +934,7 @@ class Language:
if old_name in self._config["initialize"]["components"]: if old_name in self._config["initialize"]["components"]:
init_cfg = self._config["initialize"]["components"].pop(old_name) init_cfg = self._config["initialize"]["components"].pop(old_name)
self._config["initialize"]["components"][new_name] = init_cfg self._config["initialize"]["components"][new_name] = init_cfg
self._link_components()
def remove_pipe(self, name: str) -> Tuple[str, PipeCallable]: def remove_pipe(self, name: str) -> Tuple[str, PipeCallable]:
"""Remove a component from the pipeline. """Remove a component from the pipeline.
@ -951,6 +958,7 @@ class Language:
# Make sure the name is also removed from the set of disabled components # Make sure the name is also removed from the set of disabled components
if name in self.disabled: if name in self.disabled:
self._disabled.remove(name) self._disabled.remove(name)
self._link_components()
return removed return removed
def disable_pipe(self, name: str) -> None: def disable_pipe(self, name: str) -> None:
@ -1673,8 +1681,16 @@ class Language:
# The problem is we need to do it during deserialization...And the # The problem is we need to do it during deserialization...And the
# components don't receive the pipeline then. So this does have to be # components don't receive the pipeline then. So this does have to be
# here :( # here :(
# First, fix up all the internal component names in case they have
# gotten out of sync due to sourcing components from different
# pipelines, since find_listeners uses proc2.name for the listener
# map.
for name, proc in self.pipeline:
if hasattr(proc, "name"):
proc.name = name
for i, (name1, proc1) in enumerate(self.pipeline): for i, (name1, proc1) in enumerate(self.pipeline):
if isinstance(proc1, ty.ListenedToComponent): if isinstance(proc1, ty.ListenedToComponent):
proc1.listener_map = {}
for name2, proc2 in self.pipeline[i + 1 :]: for name2, proc2 in self.pipeline[i + 1 :]:
proc1.find_listeners(proc2) proc1.find_listeners(proc2)
@ -1808,6 +1824,7 @@ class Language:
raw_config=raw_config, raw_config=raw_config,
) )
else: else:
assert "source" in pipe_cfg
# We need the sourced components to reference the same # We need the sourced components to reference the same
# vocab without modifying the current vocab state **AND** # vocab without modifying the current vocab state **AND**
# we still want to load the source model vectors to perform # we still want to load the source model vectors to perform
@ -1827,6 +1844,10 @@ class Language:
source_name = pipe_cfg.get("component", pipe_name) source_name = pipe_cfg.get("component", pipe_name)
listeners_replaced = False listeners_replaced = False
if "replace_listeners" in pipe_cfg: if "replace_listeners" in pipe_cfg:
# Make sure that the listened-to component has the
# state of the source pipeline listener map so that the
# replace_listeners method below works as intended.
source_nlps[model]._link_components()
for name, proc in source_nlps[model].pipeline: for name, proc in source_nlps[model].pipeline:
if source_name in getattr(proc, "listening_components", []): if source_name in getattr(proc, "listening_components", []):
source_nlps[model].replace_listeners( source_nlps[model].replace_listeners(
@ -1838,6 +1859,8 @@ class Language:
nlp.add_pipe( nlp.add_pipe(
source_name, source=source_nlps[model], name=pipe_name source_name, source=source_nlps[model], name=pipe_name
) )
# At this point after nlp.add_pipe, the listener map
# corresponds to the new pipeline.
if model not in source_nlp_vectors_hashes: if model not in source_nlp_vectors_hashes:
source_nlp_vectors_hashes[model] = hash( source_nlp_vectors_hashes[model] = hash(
source_nlps[model].vocab.vectors.to_bytes( source_nlps[model].vocab.vectors.to_bytes(
@ -1892,27 +1915,6 @@ class Language:
raise ValueError( raise ValueError(
Errors.E942.format(name="pipeline_creation", value=type(nlp)) Errors.E942.format(name="pipeline_creation", value=type(nlp))
) )
# Detect components with listeners that are not frozen consistently
for name, proc in nlp.pipeline:
if isinstance(proc, ty.ListenedToComponent):
# Remove listeners not in the pipeline
listener_names = proc.listening_components
unused_listener_names = [
ll for ll in listener_names if ll not in nlp.pipe_names
]
for listener_name in unused_listener_names:
for listener in proc.listener_map.get(listener_name, []):
proc.remove_listener(listener, listener_name)
for listener_name in proc.listening_components:
# e.g. tok2vec/transformer
# If it's a component sourced from another pipeline, we check if
# the tok2vec listeners should be replaced with standalone tok2vec
# models (e.g. so component can be frozen without its performance
# degrading when other components/tok2vec are updated)
paths = sourced.get(listener_name, {}).get("replace_listeners", [])
if paths:
nlp.replace_listeners(name, listener_name, paths)
return nlp return nlp
def replace_listeners( def replace_listeners(

View File

@ -188,8 +188,7 @@ def test_tok2vec_listener(with_vectors):
for tag in t[1]["tags"]: for tag in t[1]["tags"]:
tagger.add_label(tag) tagger.add_label(tag)
# Check that the Tok2Vec component finds it listeners # Check that the Tok2Vec component finds its listeners
assert tok2vec.listeners == []
optimizer = nlp.initialize(lambda: train_examples) optimizer = nlp.initialize(lambda: train_examples)
assert tok2vec.listeners == [tagger_tok2vec] assert tok2vec.listeners == [tagger_tok2vec]
@ -217,7 +216,6 @@ def test_tok2vec_listener_callback():
assert nlp.pipe_names == ["tok2vec", "tagger"] assert nlp.pipe_names == ["tok2vec", "tagger"]
tagger = nlp.get_pipe("tagger") tagger = nlp.get_pipe("tagger")
tok2vec = nlp.get_pipe("tok2vec") tok2vec = nlp.get_pipe("tok2vec")
nlp._link_components()
docs = [nlp.make_doc("A random sentence")] docs = [nlp.make_doc("A random sentence")]
tok2vec.model.initialize(X=docs) tok2vec.model.initialize(X=docs)
gold_array = [[1.0 for tag in ["V", "Z"]] for word in docs] gold_array = [[1.0 for tag in ["V", "Z"]] for word in docs]
@ -426,29 +424,46 @@ def test_replace_listeners_from_config():
nlp.to_disk(dir_path) nlp.to_disk(dir_path)
base_model = str(dir_path) base_model = str(dir_path)
new_config = { new_config = {
"nlp": {"lang": "en", "pipeline": ["tok2vec", "tagger", "ner"]}, "nlp": {
"lang": "en",
"pipeline": ["tok2vec", "tagger2", "ner3", "tagger4"],
},
"components": { "components": {
"tok2vec": {"source": base_model}, "tok2vec": {"source": base_model},
"tagger": { "tagger2": {
"source": base_model, "source": base_model,
"component": "tagger",
"replace_listeners": ["model.tok2vec"], "replace_listeners": ["model.tok2vec"],
}, },
"ner": {"source": base_model}, "ner3": {
"source": base_model,
"component": "ner",
},
"tagger4": {
"source": base_model,
"component": "tagger",
},
}, },
} }
new_nlp = util.load_model_from_config(new_config, auto_fill=True) new_nlp = util.load_model_from_config(new_config, auto_fill=True)
new_nlp.initialize(lambda: examples) new_nlp.initialize(lambda: examples)
tok2vec = new_nlp.get_pipe("tok2vec") tok2vec = new_nlp.get_pipe("tok2vec")
tagger = new_nlp.get_pipe("tagger") tagger = new_nlp.get_pipe("tagger2")
ner = new_nlp.get_pipe("ner") ner = new_nlp.get_pipe("ner3")
assert tok2vec.listening_components == ["ner"] assert "ner" not in new_nlp.pipe_names
assert "tagger" not in new_nlp.pipe_names
assert tok2vec.listening_components == ["ner3", "tagger4"]
assert any(isinstance(node, Tok2VecListener) for node in ner.model.walk()) assert any(isinstance(node, Tok2VecListener) for node in ner.model.walk())
assert not any(isinstance(node, Tok2VecListener) for node in tagger.model.walk()) assert not any(isinstance(node, Tok2VecListener) for node in tagger.model.walk())
t2v_cfg = new_nlp.config["components"]["tok2vec"]["model"] t2v_cfg = new_nlp.config["components"]["tok2vec"]["model"]
assert t2v_cfg["@architectures"] == "spacy.Tok2Vec.v2" assert t2v_cfg["@architectures"] == "spacy.Tok2Vec.v2"
assert new_nlp.config["components"]["tagger"]["model"]["tok2vec"] == t2v_cfg assert new_nlp.config["components"]["tagger2"]["model"]["tok2vec"] == t2v_cfg
assert ( assert (
new_nlp.config["components"]["ner"]["model"]["tok2vec"]["@architectures"] new_nlp.config["components"]["ner3"]["model"]["tok2vec"]["@architectures"]
== "spacy.Tok2VecListener.v1"
)
assert (
new_nlp.config["components"]["tagger4"]["model"]["tok2vec"]["@architectures"]
== "spacy.Tok2VecListener.v1" == "spacy.Tok2VecListener.v1"
) )
@ -540,3 +555,57 @@ def test_tok2vec_listeners_textcat():
assert cats1["imperative"] < 0.9 assert cats1["imperative"] < 0.9
assert [t.tag_ for t in docs[0]] == ["V", "J", "N"] assert [t.tag_ for t in docs[0]] == ["V", "J", "N"]
assert [t.tag_ for t in docs[1]] == ["N", "V", "J", "N"] assert [t.tag_ for t in docs[1]] == ["N", "V", "J", "N"]
def test_tok2vec_listener_source_link_name():
"""The component's internal name and the tok2vec listener map correspond
to the most recently modified pipeline.
"""
orig_config = Config().from_str(cfg_string_multi)
nlp1 = util.load_model_from_config(orig_config, auto_fill=True, validate=True)
assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"]
nlp2 = English()
nlp2.add_pipe("tok2vec", source=nlp1)
nlp2.add_pipe("tagger", name="tagger2", source=nlp1)
# there is no way to have the component have the right name for both
# pipelines, right now the most recently modified pipeline is prioritized
assert nlp1.get_pipe("tagger").name == nlp2.get_pipe("tagger2").name == "tagger2"
# there is no way to have the tok2vec have the right listener map for both
# pipelines, right now the most recently modified pipeline is prioritized
assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2"]
nlp2.add_pipe("ner", name="ner3", source=nlp1)
assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2", "ner3"]
nlp2.remove_pipe("ner3")
assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2"]
nlp2.remove_pipe("tagger2")
assert nlp2.get_pipe("tok2vec").listening_components == []
# at this point the tok2vec component corresponds to nlp2
assert nlp1.get_pipe("tok2vec").listening_components == []
# modifying the nlp1 pipeline syncs the tok2vec listener map back to nlp1
nlp1.add_pipe("sentencizer")
assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"]
# modifying nlp2 syncs it back to nlp2
nlp2.add_pipe("sentencizer")
assert nlp1.get_pipe("tok2vec").listening_components == []
def test_tok2vec_listener_source_replace_listeners():
orig_config = Config().from_str(cfg_string_multi)
nlp1 = util.load_model_from_config(orig_config, auto_fill=True, validate=True)
assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"]
nlp1.replace_listeners("tok2vec", "tagger", ["model.tok2vec"])
assert nlp1.get_pipe("tok2vec").listening_components == ["ner"]
nlp2 = English()
nlp2.add_pipe("tok2vec", source=nlp1)
assert nlp2.get_pipe("tok2vec").listening_components == []
nlp2.add_pipe("tagger", source=nlp1)
assert nlp2.get_pipe("tok2vec").listening_components == []
nlp2.add_pipe("ner", name="ner2", source=nlp1)
assert nlp2.get_pipe("tok2vec").listening_components == ["ner2"]

View File

@ -67,7 +67,8 @@ def init_nlp(config: Config, *, use_gpu: int = -1) -> "Language":
with nlp.select_pipes(enable=resume_components): with nlp.select_pipes(enable=resume_components):
logger.info("Resuming training for: %s", resume_components) logger.info("Resuming training for: %s", resume_components)
nlp.resume_training(sgd=optimizer) nlp.resume_training(sgd=optimizer)
# Make sure that listeners are defined before initializing further # Make sure that internal component names are synced and listeners are
# defined before initializing further
nlp._link_components() nlp._link_components()
with nlp.select_pipes(disable=[*frozen_components, *resume_components]): with nlp.select_pipes(disable=[*frozen_components, *resume_components]):
if T["max_epochs"] == -1: if T["max_epochs"] == -1: