From c865d833dc4027f5523a6ae910bb92a8066e24d2 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 18 Jun 2020 00:29:09 -0700 Subject: [PATCH] more-train-fixes --- spacy/cli/ray_param_server.py | 23 ++++++++++++----------- spacy/cli/ray_utils.py | 5 ++++- spacy/cli/train_from_config.py | 5 +++-- spacy/util.py | 8 +++++++- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/spacy/cli/ray_param_server.py b/spacy/cli/ray_param_server.py index 6039341c7..b0dec4b00 100644 --- a/spacy/cli/ray_param_server.py +++ b/spacy/cli/ray_param_server.py @@ -21,24 +21,24 @@ class OptimizerWorker: if self.waiting < self.world_size - 1: if self.waiting == 0: - self.gradient[key] = gradient.copy() + self.grad_dict[key] = gradient.copy() self.weights_dict[key] = weights.copy() else: - self.gradient[key] += gradient + self.grad_dict[key] += gradient self.waiting = self.barrier.n_waiting + 1 self.lock.release() self.barrier.wait() else: - self.gradient[key] += gradient + self.grad_dict[key] += gradient self.lock.release() - self.gradient[key] /= self.world_size + self.grad_dict[key] /= self.world_size new_weights, new_grads = self.optimizer( - key, self.weights_dict[key], self.gradient[key], lr_scale=lr_scale) + key, self.weights_dict[key], self.grad_dict[key], lr_scale=lr_scale) self.weights_dict[key] = new_weights - self.gradient[key] = new_grads + self.grad_dict[key] = new_grads self.waiting = 0 self.barrier.wait() - return self.weights_dict[key], self.gradient[key] + return self.weights_dict[key], self.grad_dict[key] def fetch(self): return self.optimizer @@ -49,12 +49,13 @@ class OptimizerWorker: class RayOptimizer: local_optimizer = None - def __init__(self, config_path, use_gpu, rank): + def __init__(self, config_path, use_gpu, world_size): RemoteOptimizer = ray.remote(OptimizerWorker) + options = {"max_concurrency": world_size} if use_gpu >= 0: - RemoteOptimizer = RemoteOptimizer.options(num_gpus=0.1) - self.optimizer = RemoteOptimizer.remote(config_path) - self.rank = rank + options["num_gpus"] = 0.1 + RemoteOptimizer = RemoteOptimizer.options(**options) + self.optimizer = RemoteOptimizer.remote(config_path, world_size) self.sync() def sync(self): diff --git a/spacy/cli/ray_utils.py b/spacy/cli/ray_utils.py index 6752289f6..11c81b73f 100644 --- a/spacy/cli/ray_utils.py +++ b/spacy/cli/ray_utils.py @@ -1,6 +1,8 @@ """Allreduce distributed training with Ray.""" +import random import ray +import numpy from wasabi import msg from .. import util @@ -10,10 +12,11 @@ nccl = None from typing import Dict, Optional, Union, Tuple, List, cast from thinc.types import FloatsXd + def create_optimizer(config_path): msg.info(f"Loading config from: {config_path}") config = util.load_config(config_path, create_objects=False) - util.fix_random_seed(config["training"]["seed"]) # Fix this. + util.fix_random_seed(config["training"]["seed"]) config = util.load_config(config_path, create_objects=True) training = config["training"] return training["optimizer"] diff --git a/spacy/cli/train_from_config.py b/spacy/cli/train_from_config.py index 9eceeecf6..4f205a1b5 100644 --- a/spacy/cli/train_from_config.py +++ b/spacy/cli/train_from_config.py @@ -198,7 +198,7 @@ def train_cli( if num_workers and num_workers > 1: import ray - ray.init(address="auto") + ray.init() if strategy == "ps": from spacy.cli.ray_param_server import RayOptimizer remote_train = ray.remote(setup_and_train) @@ -206,7 +206,8 @@ def train_cli( msg.info("Enabling GPU with Ray") remote_train = remote_train.options(num_gpus=0.9) - train_args["remote_optimizer"] = RayOptimizer(config_path, use_gpu=use_gpu) + train_args["remote_optimizer"] = RayOptimizer( + config_path, use_gpu=use_gpu, world_size=num_workers) ray.get([remote_train.remote( use_gpu, train_args, diff --git a/spacy/util.py b/spacy/util.py index d2d87bef9..004ef19e1 100644 --- a/spacy/util.py +++ b/spacy/util.py @@ -898,11 +898,17 @@ def escape_html(text): def use_gpu(gpu_id): return require_gpu(gpu_id) +def gpu_is_available(): + try: + cupy.cuda.runtime.getDeviceCount() + return True + except cupy.cuda.runtime.CUDARuntimeError: + return False def fix_random_seed(seed=0): random.seed(seed) numpy.random.seed(seed) - if cupy is not None: + if cupy is not None and gpu_is_available(): cupy.random.seed(seed)