From 0df7d449789e00419b20ca768767ad87f6920c4d Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 22 Jun 2020 17:59:08 -0700 Subject: [PATCH] move-files --- spacy/cli/ray_param_server.py | 73 ---------------------------- spacy/cli/ray_utils.py | 81 ------------------------------- spacy/cli/train_from_config.py | 87 ++++------------------------------ 3 files changed, 9 insertions(+), 232 deletions(-) diff --git a/spacy/cli/ray_param_server.py b/spacy/cli/ray_param_server.py index b0dec4b00..e69de29bb 100644 --- a/spacy/cli/ray_param_server.py +++ b/spacy/cli/ray_param_server.py @@ -1,73 +0,0 @@ -"""Parameter Server distributed training with Ray.""" -import threading -import ray -from wasabi import msg -from .. import util -from spacy.cli.ray_utils import create_optimizer - -class OptimizerWorker: - def __init__(self, config_path, world_size): - self.optimizer = create_optimizer(config_path) - self.new_weights = None - self.barrier = threading.Barrier(world_size) - self.lock = threading.Lock() - self.waiting = 0 - self.weights_dict = {} - self.grad_dict = {} - self.world_size = world_size - - def call(self, key, weights, gradient, *, lr_scale=1.0): - self.lock.acquire() - - if self.waiting < self.world_size - 1: - if self.waiting == 0: - self.grad_dict[key] = gradient.copy() - self.weights_dict[key] = weights.copy() - else: - self.grad_dict[key] += gradient - self.waiting = self.barrier.n_waiting + 1 - self.lock.release() - self.barrier.wait() - else: - self.grad_dict[key] += gradient - self.lock.release() - self.grad_dict[key] /= self.world_size - new_weights, new_grads = self.optimizer( - key, self.weights_dict[key], self.grad_dict[key], lr_scale=lr_scale) - self.weights_dict[key] = new_weights - self.grad_dict[key] = new_grads - self.waiting = 0 - self.barrier.wait() - return self.weights_dict[key], self.grad_dict[key] - - def fetch(self): - return self.optimizer - - def step_schedules(self): - self.optimizer.step_schedules() - -class RayOptimizer: - local_optimizer = None - - def __init__(self, config_path, use_gpu, world_size): - RemoteOptimizer = ray.remote(OptimizerWorker) - options = {"max_concurrency": world_size} - if use_gpu >= 0: - options["num_gpus"] = 0.1 - RemoteOptimizer = RemoteOptimizer.options(**options) - self.optimizer = RemoteOptimizer.remote(config_path, world_size) - self.sync() - - def sync(self): - self.local_optimizer = ray.get(self.optimizer.fetch.remote()) - - def __call__(self, *args, **kwargs): - weights, grads = ray.get(self.optimizer.call.remote(*args, **kwargs)) - return weights.copy(), grads.copy() - - def __getattr__(self, name): - return getattr(self.local_optimizer, name) - - def step_schedules(self): - self.optimizer.step_schedules.remote() - self.sync() diff --git a/spacy/cli/ray_utils.py b/spacy/cli/ray_utils.py index 11c81b73f..e69de29bb 100644 --- a/spacy/cli/ray_utils.py +++ b/spacy/cli/ray_utils.py @@ -1,81 +0,0 @@ -"""Allreduce distributed training with Ray.""" - -import random -import ray -import numpy -from wasabi import msg -from .. import util - -cp = None -nccl = None - -from typing import Dict, Optional, Union, Tuple, List, cast -from thinc.types import FloatsXd - - -def create_optimizer(config_path): - msg.info(f"Loading config from: {config_path}") - config = util.load_config(config_path, create_objects=False) - util.fix_random_seed(config["training"]["seed"]) - config = util.load_config(config_path, create_objects=True) - training = config["training"] - return training["optimizer"] - -class RayWorker: - def __init__(self, rank, world_size): - global nccl - from cupy.cuda import nccl - self.rank = rank - self.world_size = world_size - self.unique_id = nccl.get_unique_id() - - def initialize(self, head_id): - self.communicator = nccl.NcclCommunicator(self.world_size, head_id, self.rank) - - def get_unique_id(self): - return self.unique_id - - def execute(self, fn): - return fn(self) - -class AllreduceOptimizer: - def __init__(self, config_path, communicator): - global cp - import cupy as cp - global nccl - from cupy.cuda import nccl - self.optimizer = create_optimizer(config_path) - self.communicator = communicator - self.weights_synced = set() - - def allreduce(self, tensor): - self.communicator.allReduce( - tensor.data.ptr, - tensor.data.ptr, - tensor.size, - nccl.NCCL_FLOAT32, - nccl.NCCL_SUM, # TODO: is this a sum? - cp.cuda.Stream.null.ptr - ) - return tensor - - def __call__( - self, - key: Tuple[int, str], - weights: FloatsXd, - gradient: FloatsXd, - *, - lr_scale: float = 1.0, - ): - if key not in self.weights_synced: - self.weights_synced.add(key) - weights = self.allreduce(weights) / self.communicator.size() - - - gradient = self.allreduce(gradient) / self.communicator.size() - flat_weights, gradient = self.optimizer(key, weights, gradient, lr_scale=lr_scale) - return flat_weights, gradient - - - def __getattr__(self, name): - return getattr(self.optimizer, name) diff --git a/spacy/cli/train_from_config.py b/spacy/cli/train_from_config.py index d89d60fae..74820a268 100644 --- a/spacy/cli/train_from_config.py +++ b/spacy/cli/train_from_config.py @@ -197,71 +197,15 @@ def train_cli( ) if num_workers and num_workers > 1: - import ray - ray.init() - if strategy == "ps": - from spacy.cli.ray_param_server import RayOptimizer - remote_train = ray.remote(setup_and_train) - if use_gpu >= 0: - msg.info("Enabling GPU with Ray") - remote_train = remote_train.options(num_gpus=0.9) - - train_args["remote_optimizer"] = RayOptimizer( - config_path, use_gpu=use_gpu, world_size=num_workers) - ray.get([remote_train.remote( - use_gpu, - train_args, - rank=rank, - total_workers=num_workers) for rank in range(num_workers)]) - elif strategy == "allreduce" and use_gpu >= 0: - from spacy.cli.ray_utils import RayWorker, AllreduceOptimizer - msg.info("Enabling GPU with Ray") - RemoteRayWorker = ray.remote(RayWorker).options(num_gpus=1) - - workers = [RemoteRayWorker.remote(rank, num_workers) for rank in range(num_workers)] - head_id = ray.get(workers[0].get_unique_id.remote()) - ray.get([w.initialize.remote(head_id) for w in workers]) - def train_fn(worker): - optimizer = AllreduceOptimizer(config_path, worker.communicator) - train_args["remote_optimizer"] = optimizer - return setup_and_train(True, train_args, worker.rank, worker.world_size) - - ray.get([w.execute.remote(train_fn) for w in workers]) - elif strategy == "debug": - remote_train = ray.remote(setup_and_train) - if use_gpu >= 0: - msg.info("Enabling GPU with Ray") - remote_train = remote_train.options(num_gpus=0.9) - ray.get([remote_train.remote( - use_gpu, - train_args, - rank=rank, - total_workers=num_workers) for rank in range(num_workers)]) - else: - raise NotImplementedError - + try: + from ray_spacy import distributed_setup_and_train + except ImportError: + msg.fail("Need to install ray_spacy to use distributed training!", exits=1) + distributed_setup_and_train(use_gpu, num_workers, strategy, train_args) else: setup_and_train(use_gpu, train_args) -world_rank = None -world_size = None - -def setup_and_train(use_gpu, train_args, rank=None, total_workers=None): - if rank is not None: - global world_rank - world_rank = rank - global world_size - world_size = total_workers - if use_gpu >= 0: - use_gpu = 0 - if use_gpu >= 0: - msg.info(f"Using GPU: {use_gpu}") - util.use_gpu(use_gpu) - else: - msg.info("Using CPU") - train(**train_args) - def train( config_path, data_paths, @@ -270,6 +214,7 @@ def train( tag_map=None, weights_data=None, omit_extra_lookups=False, + disable_tqdm=False, remote_optimizer=None ): msg.info(f"Loading config from: {config_path}") @@ -285,6 +230,8 @@ def train( msg.info("Creating nlp from config") nlp = util.load_model_from_config(nlp_config) optimizer = training["optimizer"] + # TODO: is there a cleaner way of doing this, instead of creating + # the optimizer twice? are there any problems when doing this? if remote_optimizer: optimizer = remote_optimizer limit = training["limit"] @@ -402,10 +349,7 @@ def train( msg.info(f"Training. Initial learn rate: {optimizer.learn_rate}") print_row = setup_printer(training, nlp) - tqdm_args = dict(total=training["eval_frequency"], leave=False) - global world_rank - if world_rank is not None: - tqdm_args["disable"] = bool(world_rank != 0) + tqdm_args = dict(total=training["eval_frequency"], leave=False, disable=disable_tqdm) try: progress = tqdm.tqdm(**tqdm_args) for batch, info, is_best_checkpoint in training_step_iterator: @@ -461,19 +405,6 @@ def create_train_batches(nlp, corpus, cfg): raise ValueError(Errors.E988) random.shuffle(train_examples) - # # TODO: with large batches, this can be bad. - # if world_size is not None: - # # Taken from https://github.com/pytorch/pytorch/blob/master/torch/utils/data/distributed.py - # num_samples = int(math.ceil(len(train_examples) * 1.0 / world_size)) - # total_size = num_samples * world_size # expected to overflow - # train_examples += train_examples[:(total_size - len(train_examples))] - # assert len(train_examples) == total_size - - # # subsample - # train_examples = train_examples[world_rank:total_size:world_size] - # assert len(train_examples) == num_samples - # print(f"Reset epoch: Only using {num_samples} out of {total_size} samples") - batches = util.minibatch_by_words( train_examples, size=cfg["batch_size"],