mirror of
https://github.com/explosion/spaCy.git
synced 2025-02-28 17:40:39 +03:00
move-files
This commit is contained in:
parent
8bbf8c78bf
commit
0df7d44978
|
@ -1,73 +0,0 @@
|
||||||
"""Parameter Server distributed training with Ray."""
|
|
||||||
import threading
|
|
||||||
import ray
|
|
||||||
from wasabi import msg
|
|
||||||
from .. import util
|
|
||||||
from spacy.cli.ray_utils import create_optimizer
|
|
||||||
|
|
||||||
class OptimizerWorker:
|
|
||||||
def __init__(self, config_path, world_size):
|
|
||||||
self.optimizer = create_optimizer(config_path)
|
|
||||||
self.new_weights = None
|
|
||||||
self.barrier = threading.Barrier(world_size)
|
|
||||||
self.lock = threading.Lock()
|
|
||||||
self.waiting = 0
|
|
||||||
self.weights_dict = {}
|
|
||||||
self.grad_dict = {}
|
|
||||||
self.world_size = world_size
|
|
||||||
|
|
||||||
def call(self, key, weights, gradient, *, lr_scale=1.0):
|
|
||||||
self.lock.acquire()
|
|
||||||
|
|
||||||
if self.waiting < self.world_size - 1:
|
|
||||||
if self.waiting == 0:
|
|
||||||
self.grad_dict[key] = gradient.copy()
|
|
||||||
self.weights_dict[key] = weights.copy()
|
|
||||||
else:
|
|
||||||
self.grad_dict[key] += gradient
|
|
||||||
self.waiting = self.barrier.n_waiting + 1
|
|
||||||
self.lock.release()
|
|
||||||
self.barrier.wait()
|
|
||||||
else:
|
|
||||||
self.grad_dict[key] += gradient
|
|
||||||
self.lock.release()
|
|
||||||
self.grad_dict[key] /= self.world_size
|
|
||||||
new_weights, new_grads = self.optimizer(
|
|
||||||
key, self.weights_dict[key], self.grad_dict[key], lr_scale=lr_scale)
|
|
||||||
self.weights_dict[key] = new_weights
|
|
||||||
self.grad_dict[key] = new_grads
|
|
||||||
self.waiting = 0
|
|
||||||
self.barrier.wait()
|
|
||||||
return self.weights_dict[key], self.grad_dict[key]
|
|
||||||
|
|
||||||
def fetch(self):
|
|
||||||
return self.optimizer
|
|
||||||
|
|
||||||
def step_schedules(self):
|
|
||||||
self.optimizer.step_schedules()
|
|
||||||
|
|
||||||
class RayOptimizer:
|
|
||||||
local_optimizer = None
|
|
||||||
|
|
||||||
def __init__(self, config_path, use_gpu, world_size):
|
|
||||||
RemoteOptimizer = ray.remote(OptimizerWorker)
|
|
||||||
options = {"max_concurrency": world_size}
|
|
||||||
if use_gpu >= 0:
|
|
||||||
options["num_gpus"] = 0.1
|
|
||||||
RemoteOptimizer = RemoteOptimizer.options(**options)
|
|
||||||
self.optimizer = RemoteOptimizer.remote(config_path, world_size)
|
|
||||||
self.sync()
|
|
||||||
|
|
||||||
def sync(self):
|
|
||||||
self.local_optimizer = ray.get(self.optimizer.fetch.remote())
|
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
|
||||||
weights, grads = ray.get(self.optimizer.call.remote(*args, **kwargs))
|
|
||||||
return weights.copy(), grads.copy()
|
|
||||||
|
|
||||||
def __getattr__(self, name):
|
|
||||||
return getattr(self.local_optimizer, name)
|
|
||||||
|
|
||||||
def step_schedules(self):
|
|
||||||
self.optimizer.step_schedules.remote()
|
|
||||||
self.sync()
|
|
|
@ -1,81 +0,0 @@
|
||||||
"""Allreduce distributed training with Ray."""
|
|
||||||
|
|
||||||
import random
|
|
||||||
import ray
|
|
||||||
import numpy
|
|
||||||
from wasabi import msg
|
|
||||||
from .. import util
|
|
||||||
|
|
||||||
cp = None
|
|
||||||
nccl = None
|
|
||||||
|
|
||||||
from typing import Dict, Optional, Union, Tuple, List, cast
|
|
||||||
from thinc.types import FloatsXd
|
|
||||||
|
|
||||||
|
|
||||||
def create_optimizer(config_path):
|
|
||||||
msg.info(f"Loading config from: {config_path}")
|
|
||||||
config = util.load_config(config_path, create_objects=False)
|
|
||||||
util.fix_random_seed(config["training"]["seed"])
|
|
||||||
config = util.load_config(config_path, create_objects=True)
|
|
||||||
training = config["training"]
|
|
||||||
return training["optimizer"]
|
|
||||||
|
|
||||||
class RayWorker:
|
|
||||||
def __init__(self, rank, world_size):
|
|
||||||
global nccl
|
|
||||||
from cupy.cuda import nccl
|
|
||||||
self.rank = rank
|
|
||||||
self.world_size = world_size
|
|
||||||
self.unique_id = nccl.get_unique_id()
|
|
||||||
|
|
||||||
def initialize(self, head_id):
|
|
||||||
self.communicator = nccl.NcclCommunicator(self.world_size, head_id, self.rank)
|
|
||||||
|
|
||||||
def get_unique_id(self):
|
|
||||||
return self.unique_id
|
|
||||||
|
|
||||||
def execute(self, fn):
|
|
||||||
return fn(self)
|
|
||||||
|
|
||||||
class AllreduceOptimizer:
|
|
||||||
def __init__(self, config_path, communicator):
|
|
||||||
global cp
|
|
||||||
import cupy as cp
|
|
||||||
global nccl
|
|
||||||
from cupy.cuda import nccl
|
|
||||||
self.optimizer = create_optimizer(config_path)
|
|
||||||
self.communicator = communicator
|
|
||||||
self.weights_synced = set()
|
|
||||||
|
|
||||||
def allreduce(self, tensor):
|
|
||||||
self.communicator.allReduce(
|
|
||||||
tensor.data.ptr,
|
|
||||||
tensor.data.ptr,
|
|
||||||
tensor.size,
|
|
||||||
nccl.NCCL_FLOAT32,
|
|
||||||
nccl.NCCL_SUM, # TODO: is this a sum?
|
|
||||||
cp.cuda.Stream.null.ptr
|
|
||||||
)
|
|
||||||
return tensor
|
|
||||||
|
|
||||||
def __call__(
|
|
||||||
self,
|
|
||||||
key: Tuple[int, str],
|
|
||||||
weights: FloatsXd,
|
|
||||||
gradient: FloatsXd,
|
|
||||||
*,
|
|
||||||
lr_scale: float = 1.0,
|
|
||||||
):
|
|
||||||
if key not in self.weights_synced:
|
|
||||||
self.weights_synced.add(key)
|
|
||||||
weights = self.allreduce(weights) / self.communicator.size()
|
|
||||||
|
|
||||||
|
|
||||||
gradient = self.allreduce(gradient) / self.communicator.size()
|
|
||||||
flat_weights, gradient = self.optimizer(key, weights, gradient, lr_scale=lr_scale)
|
|
||||||
return flat_weights, gradient
|
|
||||||
|
|
||||||
|
|
||||||
def __getattr__(self, name):
|
|
||||||
return getattr(self.optimizer, name)
|
|
|
@ -197,71 +197,15 @@ def train_cli(
|
||||||
)
|
)
|
||||||
|
|
||||||
if num_workers and num_workers > 1:
|
if num_workers and num_workers > 1:
|
||||||
import ray
|
try:
|
||||||
ray.init()
|
from ray_spacy import distributed_setup_and_train
|
||||||
if strategy == "ps":
|
except ImportError:
|
||||||
from spacy.cli.ray_param_server import RayOptimizer
|
msg.fail("Need to install ray_spacy to use distributed training!", exits=1)
|
||||||
remote_train = ray.remote(setup_and_train)
|
distributed_setup_and_train(use_gpu, num_workers, strategy, train_args)
|
||||||
if use_gpu >= 0:
|
|
||||||
msg.info("Enabling GPU with Ray")
|
|
||||||
remote_train = remote_train.options(num_gpus=0.9)
|
|
||||||
|
|
||||||
train_args["remote_optimizer"] = RayOptimizer(
|
|
||||||
config_path, use_gpu=use_gpu, world_size=num_workers)
|
|
||||||
ray.get([remote_train.remote(
|
|
||||||
use_gpu,
|
|
||||||
train_args,
|
|
||||||
rank=rank,
|
|
||||||
total_workers=num_workers) for rank in range(num_workers)])
|
|
||||||
elif strategy == "allreduce" and use_gpu >= 0:
|
|
||||||
from spacy.cli.ray_utils import RayWorker, AllreduceOptimizer
|
|
||||||
msg.info("Enabling GPU with Ray")
|
|
||||||
RemoteRayWorker = ray.remote(RayWorker).options(num_gpus=1)
|
|
||||||
|
|
||||||
workers = [RemoteRayWorker.remote(rank, num_workers) for rank in range(num_workers)]
|
|
||||||
head_id = ray.get(workers[0].get_unique_id.remote())
|
|
||||||
ray.get([w.initialize.remote(head_id) for w in workers])
|
|
||||||
def train_fn(worker):
|
|
||||||
optimizer = AllreduceOptimizer(config_path, worker.communicator)
|
|
||||||
train_args["remote_optimizer"] = optimizer
|
|
||||||
return setup_and_train(True, train_args, worker.rank, worker.world_size)
|
|
||||||
|
|
||||||
ray.get([w.execute.remote(train_fn) for w in workers])
|
|
||||||
elif strategy == "debug":
|
|
||||||
remote_train = ray.remote(setup_and_train)
|
|
||||||
if use_gpu >= 0:
|
|
||||||
msg.info("Enabling GPU with Ray")
|
|
||||||
remote_train = remote_train.options(num_gpus=0.9)
|
|
||||||
ray.get([remote_train.remote(
|
|
||||||
use_gpu,
|
|
||||||
train_args,
|
|
||||||
rank=rank,
|
|
||||||
total_workers=num_workers) for rank in range(num_workers)])
|
|
||||||
else:
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
setup_and_train(use_gpu, train_args)
|
setup_and_train(use_gpu, train_args)
|
||||||
|
|
||||||
world_rank = None
|
|
||||||
world_size = None
|
|
||||||
|
|
||||||
def setup_and_train(use_gpu, train_args, rank=None, total_workers=None):
|
|
||||||
if rank is not None:
|
|
||||||
global world_rank
|
|
||||||
world_rank = rank
|
|
||||||
global world_size
|
|
||||||
world_size = total_workers
|
|
||||||
if use_gpu >= 0:
|
|
||||||
use_gpu = 0
|
|
||||||
if use_gpu >= 0:
|
|
||||||
msg.info(f"Using GPU: {use_gpu}")
|
|
||||||
util.use_gpu(use_gpu)
|
|
||||||
else:
|
|
||||||
msg.info("Using CPU")
|
|
||||||
train(**train_args)
|
|
||||||
|
|
||||||
def train(
|
def train(
|
||||||
config_path,
|
config_path,
|
||||||
data_paths,
|
data_paths,
|
||||||
|
@ -270,6 +214,7 @@ def train(
|
||||||
tag_map=None,
|
tag_map=None,
|
||||||
weights_data=None,
|
weights_data=None,
|
||||||
omit_extra_lookups=False,
|
omit_extra_lookups=False,
|
||||||
|
disable_tqdm=False,
|
||||||
remote_optimizer=None
|
remote_optimizer=None
|
||||||
):
|
):
|
||||||
msg.info(f"Loading config from: {config_path}")
|
msg.info(f"Loading config from: {config_path}")
|
||||||
|
@ -285,6 +230,8 @@ def train(
|
||||||
msg.info("Creating nlp from config")
|
msg.info("Creating nlp from config")
|
||||||
nlp = util.load_model_from_config(nlp_config)
|
nlp = util.load_model_from_config(nlp_config)
|
||||||
optimizer = training["optimizer"]
|
optimizer = training["optimizer"]
|
||||||
|
# TODO: is there a cleaner way of doing this, instead of creating
|
||||||
|
# the optimizer twice? are there any problems when doing this?
|
||||||
if remote_optimizer:
|
if remote_optimizer:
|
||||||
optimizer = remote_optimizer
|
optimizer = remote_optimizer
|
||||||
limit = training["limit"]
|
limit = training["limit"]
|
||||||
|
@ -402,10 +349,7 @@ def train(
|
||||||
msg.info(f"Training. Initial learn rate: {optimizer.learn_rate}")
|
msg.info(f"Training. Initial learn rate: {optimizer.learn_rate}")
|
||||||
print_row = setup_printer(training, nlp)
|
print_row = setup_printer(training, nlp)
|
||||||
|
|
||||||
tqdm_args = dict(total=training["eval_frequency"], leave=False)
|
tqdm_args = dict(total=training["eval_frequency"], leave=False, disable=disable_tqdm)
|
||||||
global world_rank
|
|
||||||
if world_rank is not None:
|
|
||||||
tqdm_args["disable"] = bool(world_rank != 0)
|
|
||||||
try:
|
try:
|
||||||
progress = tqdm.tqdm(**tqdm_args)
|
progress = tqdm.tqdm(**tqdm_args)
|
||||||
for batch, info, is_best_checkpoint in training_step_iterator:
|
for batch, info, is_best_checkpoint in training_step_iterator:
|
||||||
|
@ -461,19 +405,6 @@ def create_train_batches(nlp, corpus, cfg):
|
||||||
raise ValueError(Errors.E988)
|
raise ValueError(Errors.E988)
|
||||||
random.shuffle(train_examples)
|
random.shuffle(train_examples)
|
||||||
|
|
||||||
# # TODO: with large batches, this can be bad.
|
|
||||||
# if world_size is not None:
|
|
||||||
# # Taken from https://github.com/pytorch/pytorch/blob/master/torch/utils/data/distributed.py
|
|
||||||
# num_samples = int(math.ceil(len(train_examples) * 1.0 / world_size))
|
|
||||||
# total_size = num_samples * world_size # expected to overflow
|
|
||||||
# train_examples += train_examples[:(total_size - len(train_examples))]
|
|
||||||
# assert len(train_examples) == total_size
|
|
||||||
|
|
||||||
# # subsample
|
|
||||||
# train_examples = train_examples[world_rank:total_size:world_size]
|
|
||||||
# assert len(train_examples) == num_samples
|
|
||||||
# print(f"Reset epoch: Only using {num_samples} out of {total_size} samples")
|
|
||||||
|
|
||||||
batches = util.minibatch_by_words(
|
batches = util.minibatch_by_words(
|
||||||
train_examples,
|
train_examples,
|
||||||
size=cfg["batch_size"],
|
size=cfg["batch_size"],
|
||||||
|
|
Loading…
Reference in New Issue
Block a user