some-allreduce

This commit is contained in:
Richard Liaw 2020-06-16 12:17:01 -07:00
parent ef2af90f54
commit 8cc49c5a03
2 changed files with 98 additions and 17 deletions

View File

@ -2,16 +2,23 @@ import ray
from wasabi import msg
from .. import util
cp = None
nccl = None
from typing import Dict, Optional, Union, Tuple, List, cast
from thinc.types import FloatsXd
def _create_optimizer(config_path):
msg.info(f"Loading config from: {config_path}")
config = util.load_config(config_path, create_objects=False)
util.fix_random_seed(config["training"]["seed"]) # Fix this.
config = util.load_config(config_path, create_objects=True)
training = config["training"]
return training["optimizer"]
class OptimizerWorker:
def __init__(self, config_path):
msg.info(f"Loading config from: {config_path}")
config = util.load_config(config_path, create_objects=False)
util.fix_random_seed(config["training"]["seed"])
config = util.load_config(config_path, create_objects=True)
training = config["training"]
optimizer = training["optimizer"]
self.optimizer = optimizer
self.optimizer = _create_optimizer(config_path)
self.weights_dict = {}
def call(self, key, weights, gradient, *, lr_scale=1.0):
@ -51,3 +58,57 @@ class RayOptimizer:
def step_schedules(self):
self.optimizer.step_schedules.remote()
self.sync()
class RayWorker:
def __init__(self, rank, world_size):
global nccl
from cupy.cuda import nccl
self.rank = rank
self.world_size = world_size
self.unique_id = nccl.get_unique_id()
def initialize(self, head_id):
self.communicator = nccl.NcclCommunicator(self.world_size, head_id, self.rank)
def get_unique_id(self):
return self.unique_id
def execute(self, fn):
return fn(self)
class AllreduceOptimizer:
def __init__(self, config_path, communicator):
global cp
import cupy as cp
global nccl
from cupy.cuda import nccl
self.optimizer = _create_optimizer(config_path)
self.communicator = communicator
def allreduce(self, tensor):
self.communicator.allReduce(
tensor.data.ptr,
tensor.data.ptr,
tensor.size,
nccl.NCCL_FLOAT32,
nccl.NCCL_SUM, # TODO: is this a sum?
cp.cuda.Stream.null.ptr
)
return tensor
def __call__(
self,
key: Tuple[int, str],
weights: FloatsXd,
gradient: FloatsXd,
*,
lr_scale: float = 1.0,
):
# weights = self.allreduce(weights)
gradient = self.allreduce(gradient)
flat_weights, gradient = self.optimizer(key, weights, gradient, lr_scale=lr_scale)
return flat_weights, gradient
def __getattr__(self, name):
return getattr(self.optimizer, name)

View File

@ -128,6 +128,7 @@ class ConfigSchema(BaseModel):
verbose=("Display more information for debugging purposes", "flag", "VV", bool),
use_gpu=("Use GPU", "option", "g", int),
num_workers=("Parallel Workers", "option", "j", int),
strategy=("Distributed training strategy", "option", "strat", str),
tag_map_path=("Location of JSON-formatted tag map", "option", "tm", Path),
omit_extra_lookups=("Don't include extra lookups in model", "flag", "OEL", bool),
# fmt: on
@ -142,6 +143,7 @@ def train_cli(
verbose=False,
use_gpu=-1,
num_workers=1,
strategy="ps",
tag_map_path=None,
omit_extra_lookups=False,
):
@ -198,17 +200,35 @@ def train_cli(
from spacy.cli.ray_utils import RayOptimizer
import ray
ray.init()
remote_train = ray.remote(setup_and_train)
if use_gpu >= 0:
msg.info("Enabling GPU with Ray")
remote_train = remote_train.options(num_gpus=0.9)
if strategy == "ps":
remote_train = ray.remote(setup_and_train)
if use_gpu >= 0:
msg.info("Enabling GPU with Ray")
remote_train = remote_train.options(num_gpus=0.9)
train_args["remote_optimizer"] = RayOptimizer(config_path, use_gpu=use_gpu)
ray.get([remote_train.remote(
use_gpu,
train_args,
rank=rank,
total_workers=num_workers) for rank in range(num_workers)])
elif strategy == "allreduce" and use_gpu >= 0:
from spacy.cli.ray_utils import RayWorker, AllreduceOptimizer
msg.info("Enabling GPU with Ray")
RemoteRayWorker = ray.remote(RayWorker).options(num_gpus=1)
workers = [RemoteRayWorker.remote(rank, num_workers) for rank in range(num_workers)]
head_id = ray.get(workers[0].get_unique_id.remote())
ray.get([w.initialize.remote(head_id) for w in workers])
def train_fn(worker):
optimizer = AllreduceOptimizer(config_path, worker.communicator)
train_args["remote_optimizer"] = optimizer
return setup_and_train(True, train_args, worker.rank, worker.world_size)
ray.get([w.execute.remote(train_fn) for w in workers])
else:
raise NotImplementedError
train_args["remote_optimizer"] = RayOptimizer(config_path, use_gpu=use_gpu)
ray.get([remote_train.remote(
use_gpu,
train_args,
rank=rank,
total_workers=num_workers) for rank in range(num_workers)])
else:
setup_and_train(use_gpu, train_args)