Skip to content

Commit

Permalink
Merge branch 'master' into loadams/shuffle-true-dataloader
Browse files Browse the repository at this point in the history
  • Loading branch information
loadams authored Jan 27, 2025
2 parents 9464a68 + 1640f6d commit ba634fb
Show file tree
Hide file tree
Showing 22 changed files with 178 additions and 115 deletions.
2 changes: 1 addition & 1 deletion accelerator/real_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def get_accelerator():
if accelerator_name is None:
# borrow this log from PR#5084
if accel_logger is not None:
accel_logger.warn(
accel_logger.warning(
"Setting accelerator to CPU. If you have GPU or other accelerator, we were unable to detect it.")
# cpu added as catch-all when accelerator detection fails
accelerator_name = "cpu"
Expand Down
1 change: 1 addition & 0 deletions build_win.bat
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set DS_BUILD_AIO=0
set DS_BUILD_CUTLASS_OPS=0
set DS_BUILD_EVOFORMER_ATTN=0
set DS_BUILD_FP_QUANTIZER=0
set DS_BUILD_GDS=0
set DS_BUILD_RAGGED_DEVICE_OPS=0
set DS_BUILD_SPARSE_ATTN=0

Expand Down
9 changes: 9 additions & 0 deletions deepspeed/inference/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ class DeepSpeedInferenceConfig(DeepSpeedConfigModel):
values for :any:`DeepSpeedMoEConfig`.
"""

keep_module_on_host: bool = False
"""
When loading checkpoints to model parameters, they are moved to the device. In very large models
this might fill the device and cause OOM. Setting this flag to true, will keep checkpoints on
host and not move them directly to the device (giving an option to quantize checkpoint data before
moving it to the device for example).
Set only for models with injection policies and auto TP.
"""

quant: QuantizationConfig = {}
"""
NOTE: only works for int8 dtype.
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/inference/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def __init__(self, model, config):
is_meta_device = hasattr(self.module, "device") and self.module.device.type == 'meta'
if is_meta_device:
self.module.to_empty(device=device)
else:
elif not config.keep_module_on_host:
self.module.to(device)

if config.tensor_parallel.tp_size > 1:
Expand Down
33 changes: 22 additions & 11 deletions deepspeed/module_inject/auto_tp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
from deepspeed.module_inject.tp_shard import get_shard_size, get_shard_size_list


def move(tensor, device):
def move(tensor, device, copy=True):
if tensor.is_meta:
return torch.empty_like(tensor, device=device)
else:
# Using new tensors help in freeing memory (after split for example) was done before by calling clone().
# Using copy=True instead of clone() will help in case of cpu --> cpu.
# Otherwise to() will not create a new copy for the view of the full tensor, and it will not be de-referenced.
return tensor.to(device, copy=True)
return tensor.to(device, copy=copy)


class ReplaceWithTensorSlicing:
Expand Down Expand Up @@ -189,7 +189,14 @@ def load(module, state_dict, prefix, mp_group=None):

class AutoTP():

def __init__(self, module, all_reduce_linears, prefix, state_dict, linear_layer_setting, orig_layer_impl):
def __init__(self,
module,
all_reduce_linears,
prefix,
state_dict,
linear_layer_setting,
orig_layer_impl,
keep_module_on_host=False):
self.module = module
self.all_reduce_linears = all_reduce_linears
self.prefix = prefix
Expand All @@ -201,6 +208,7 @@ def __init__(self, module, all_reduce_linears, prefix, state_dict, linear_layer_
self.orig_layer_impl = orig_layer_impl
self.linear_policies = None
self.conv_linear_layer = False
self.keep_module_on_host = keep_module_on_host

def in_module_list(module, module_list):
for item in module_list:
Expand Down Expand Up @@ -331,6 +339,10 @@ def set_tensor_parallel_config(self, mp_size, mp_group):
def _replace(self, child, name, conv_linear_layer):
if getattr(child, "replaced", False) == True:
return
device_name = 'cpu' if self.keep_module_on_host else get_accelerator().current_device_name()
# keep_module_on_host is used to keep the module on the host. Checkpoints are loaded to the host first (in some
# cases it can be done from the disk even to prevent filling host's memory), thus no need to create a new copy.
return_new_copy = not self.keep_module_on_host
weight_shape = child.weight.shape
mp_replace = ReplaceWithTensorSlicing(mp_group=self.mp_group)
# For TP layer skip, e.g., MoE gate, deepseek low rank layer skip
Expand Down Expand Up @@ -368,18 +380,17 @@ def _replace(self, child, name, conv_linear_layer):
data = child.weight.data.split(get_shard_size_list(
weight_shape[0] if self.conv_linear_layer else weight_shape[1], self.mp_size, name),
dim=1)
data_dc = move(data[mp_replace.gpu_index], get_accelerator().current_device_name()).detach()
data_dc = move(data[mp_replace.gpu_index], device_name, return_new_copy).detach()
del data

setattr(child, "replaced", True)
if name == "lm_head" or name == 'embed_out':
return LmHeadLinearAllreduce(
torch.nn.parameter.Parameter(data_dc, requires_grad=False), dist.get_rank(), dist.get_world_size(),
child.bias if child.bias is None else torch.nn.parameter.Parameter(
move(child.bias,
get_accelerator().current_device_name())), self.mp_group)
move(child.bias, device_name, return_new_copy)), self.mp_group)
return LinearAllreduce(torch.nn.parameter.Parameter(data_dc, requires_grad=False), child.bias if child.bias is None else \
torch.nn.parameter.Parameter(move(child.bias, get_accelerator().current_device_name())), self.mp_group)
torch.nn.parameter.Parameter(move(child.bias, device_name, return_new_copy)), self.mp_group)
else:

# if conv_linear_layer [weight_shape[1], weight_shape[0] // mp_size]
Expand All @@ -392,22 +403,22 @@ def _replace(self, child, name, conv_linear_layer):
#The copy is a regular copy, The shape of dst and src is the same
data_dc = move(
prepare_tp_fused_qkvw(self.module, child.weight.data, self.mp_size, mp_replace.gpu_index),
get_accelerator().current_device_name())
device_name, return_new_copy)

bias_data_dc = None if child.bias is None else move(
prepare_tp_fused_qkvw(self.module, child.bias.data, self.mp_size, mp_replace.gpu_index),
get_accelerator().current_device_name())
device_name, return_new_copy)
else:
data = child.weight.data.split(get_shard_size_list(weight_shape[0], self.mp_size, name),
dim=1 if self.conv_linear_layer else 0)
data_dc = move(data[mp_replace.gpu_index], get_accelerator().current_device_name()).detach()
data_dc = move(data[mp_replace.gpu_index], device_name, return_new_copy).detach()
del data

if child.bias is not None:
bias_data = child.bias.data.split(get_shard_size_list(
weight_shape[1] if self.conv_linear_layer else weight_shape[0], self.mp_size, name),
dim=0)
bias_data = move(bias_data[mp_replace.gpu_index], get_accelerator().current_device_name())
bias_data = move(bias_data[mp_replace.gpu_index], device_name, return_new_copy)
bias_data_dc = torch.nn.parameter.Parameter(bias_data, requires_grad=False)
del bias_data
else:
Expand Down
21 changes: 21 additions & 0 deletions deepspeed/module_inject/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ def forward(self, input):
output += self.bias
return output

def extra_repr(self):
out_features, in_features = self.weight.shape if self.weight is not None else (None, None)
dtype = self.weight.dtype if self.weight is not None else None
extra_repr_str = "in_features={}, out_features={}, bias={}, dtype={}".format(
in_features, out_features, self.bias is not None, dtype)
return extra_repr_str


class LmHeadLinearAllreduce(nn.Module):

Expand Down Expand Up @@ -120,6 +127,13 @@ def forward(self, input):
output += self.bias
return output

def extra_repr(self):
out_features, in_features = self.weight.shape if self.weight is not None else (None, None)
dtype = self.weight.dtype if self.weight is not None else None
extra_repr_str = "in_features={}, out_features={}, bias={}, dtype={}".format(
in_features, out_features, self.bias is not None, dtype)
return extra_repr_str


class LinearLayer(nn.Module):

Expand All @@ -144,6 +158,13 @@ def forward(self, input):
output += self.bias
return output

def extra_repr(self):
out_features, in_features = self.weight.shape
dtype = self.weight.dtype
extra_repr_str = "in_features={}, out_features={}, bias={}, dtype={}".format(
in_features, out_features, self.bias is not None, dtype)
return extra_repr_str


class Normalize(nn.Module):

Expand Down
3 changes: 2 additions & 1 deletion deepspeed/module_inject/replace_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ def replace_wo_policy(module, all_reduce_linears, prefix="", state_dict=None):
#mp_replace = ReplaceWithTensorSlicing(mp_group=config.tensor_parallel.tp_group)

# 1. Create AutoTP object
_autotp = AutoTP(module, all_reduce_linears, prefix, state_dict, linear_layer_setting, orig_layer_impl)
_autotp = AutoTP(module, all_reduce_linears, prefix, state_dict, linear_layer_setting, orig_layer_impl,
config.keep_module_on_host)

# 2. Set the tensor parallelism config
_autotp.set_tensor_parallel_config(config.tensor_parallel.tp_size, config.tensor_parallel.tp_group)
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/base_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def load_hp_checkpoint_state_from_checkpoint_dir(self, lp_groups_name: str, chec

tp_rank = bwc_tensor_model_parallel_rank(mpu=self.mpu)
if self.mpu is None:
logger.warn("MPU is not provided, setting tp size to 1 in checkpoint loading.")
logger.warning("MPU is not provided, setting tp size to 1 in checkpoint loading.")
tp_world_size = 1
else:
tp_world_size = self.mpu.get_slice_parallel_world_size() if hasattr(self.mpu, "get_slice_parallel_world_size") \
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/comm/compressed.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def compressed_allreduce(self, buffer_m: torch.tensor, worker_error, server_erro

compensated_server_m.add_(server_error)

server_scale = torch.norm(compensated_server_m) / np.sqrt(compensated_server_m.numel())
server_scale = torch.linalg.norm(compensated_server_m) / np.sqrt(compensated_server_m.numel())

server_error.set_(compensated_server_m -
server_scale * compensated_server_m.sign().add_(1).bool().float().add_(-0.5).mul_(2.0))
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/comm/hccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def compressed_allreduce(self, buffer_m: torch.tensor, worker_error, server_erro

compensated_server_m.add_(server_error)

server_scale = torch.norm(compensated_server_m) / np.sqrt(compensated_server_m.numel())
server_scale = torch.linalg.norm(compensated_server_m) / np.sqrt(compensated_server_m.numel())

server_error.set_(compensated_server_m -
server_scale * compensated_server_m.sign().add_(1).bool().float().add_(-0.5).mul_(2.0))
Expand Down
14 changes: 6 additions & 8 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,10 +799,8 @@ def zero_load_from_fp32_weights(self):
def zero_elastic_checkpoint(self):
return self._config.zero_config.elastic_checkpoint

def zero_has_nvme_offload(self):
if not hasattr(self.optimizer, "swap_optimizer"):
return False
return self.optimizer.swap_optimizer or self.optimizer.params_in_nvme_and_cpu
def zero_nvme_offload_optimizer(self):
return getattr(self.optimizer, "swap_optimizer", False)

def zero_max_live_parameters(self):
return self._config.zero_config.max_live_parameters
Expand Down Expand Up @@ -2865,7 +2863,7 @@ def load_checkpoint(self,
if not success:
self.optimizer._restore_from_bit16_weights()

if self.zero_has_nvme_offload():
if self.zero_nvme_offload_optimizer():
from shutil import copytree, disk_usage
offload_dir = self.optimizer.optimizer_swapper.swap_folder
offload_ckpt_dir = os.path.join(load_dir, tag, "offloaded_tensors")
Expand Down Expand Up @@ -3120,7 +3118,7 @@ def _get_all_zero_checkpoints(self, load_dir, tag):
if bf16_mode is not self.bfloat16_enabled():
checkpoint_bit16 = BFLOAT16 if bf16_mode else FP16
engine_bit16 = BFLOAT16 if self.bfloat16_enabled() else FP16
logger.warn(f'Loading {checkpoint_bit16} zero checkpoints into {engine_bit16} training engine')
logger.warning(f'Loading {checkpoint_bit16} zero checkpoints into {engine_bit16} training engine')
return self._get_all_zero_checkpoint_state_dicts(zero_ckpt_names)

return None
Expand Down Expand Up @@ -3205,7 +3203,7 @@ def save_checkpoint(self, save_dir, tag=None, client_state={}, save_latest=True,
self._create_zero_checkpoint_files(save_dir, tag)
self._save_zero_checkpoint(save_dir, tag)

if self.zero_has_nvme_offload():
if self.zero_nvme_offload_optimizer():
from shutil import copytree, disk_usage
offload_dir = self.optimizer.optimizer_swapper.swap_folder
offload_ckpt_dir = os.path.join(save_dir, tag, "offloaded_tensors")
Expand Down Expand Up @@ -3276,7 +3274,7 @@ def _save_moe_checkpoint(self, save_dir, tag, client_state={}, exclude_frozen_pa

local_expert_id = None
if not m:
logger.warn(f'No expert found in key {key}.')
logger.warning(f'No expert found in key {key}.')
else:
local_expert_id = m.group(1)

Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/fp16/onebit/lamb.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def step(self, closure=None, grads=None):
# This is used to reduce compression error during compression stage.
momentum_scales = []
for group in self.param_groups:
momentum_scales.append([(torch.linalg.norm(self.state[p]['exp_avg']) /
momentum_scales.append([(torch.linalg.vector_norm(self.state[p]['exp_avg']) /
np.sqrt(torch.numel(self.state[p]['exp_avg']))).item()
for p in group['params']])
united_scale = sum([sum(x) for x in momentum_scales]) / sum([len(x) for x in momentum_scales])
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/lr_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ def _initialize_lr(self, optimizer, cycle_min_lr, cycle_max_lr, decay_lr_rate, l
def _initialize_momentum(self, optimizer, cycle_min_mom, cycle_max_mom, decay_mom_rate, last_batch_iteration):
if 'betas' not in optimizer.defaults:
optimizer_name = type(optimizer).__name__
logger.warn(
logger.warning(
f"cycle_momentum is disabled because optimizer {optimizer_name} does not support momentum, no betas attribute in defaults"
)
self.cycle_momentum = False
Expand Down
5 changes: 5 additions & 0 deletions deepspeed/runtime/swap_tensor/optimizer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ def __init__(self, swap_config, aio_config, base_folder, optimizer, largest_nume
'timer_names',
]

def purge_state(self):
for swap_info in self.swap_params_info.values():
swap_info.tensors = [swap_info.tensors[0]]
swap_info.has_state_tensors = False

def swappable_tensor(self, param=None, numel=None):
assert param is not None or numel is not None, "Either param or numel must be provided"
if param is not None:
Expand Down
27 changes: 7 additions & 20 deletions deepspeed/runtime/zero/stage3.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,15 +546,10 @@ def _setup_for_real_optimizer(self):
self.grad_partitions_flat_buffer = get_accelerator().pin_memory(self.grad_partitions_flat_buffer)

offset = 0
max_partition_numel = 0
for param in all_params:
self.__param_id_to_grad_partition[param.ds_id] = self.grad_partitions_flat_buffer.narrow(
0, offset, param.partition_numel())
offset += param.partition_numel()
max_partition_numel = max(max_partition_numel, param.partition_numel())
if self.offload_optimizer:
self.pinned_grad_buffer: Tensor = get_accelerator().pin_memory(
torch.empty(max_partition_numel, device=self.device))

def _link_all_hp_params(self):
for p in self.module.parameters():
Expand Down Expand Up @@ -1510,13 +1505,9 @@ def partition_grads(self, params_to_release: List[Parameter], grad_partitions: L
offload_fp32_gradients[i].append(grad_buffer.float())
offload_fp32_offsets[i].append(dest_offset)
else:
buffer_numel = grad_buffer.numel()
fp32_grad_tensor = self.fp32_partitioned_groups_flat[i].grad.narrow(
0, dest_offset, buffer_numel)
self.pinned_grad_buffer[:buffer_numel].copy_(
grad_buffer.to(dtype=torch.float32, non_blocking=True))
get_accelerator().synchronize()
fp32_grad_tensor.copy_(self.pinned_grad_buffer[:buffer_numel], non_blocking=True)
0, dest_offset, grad_buffer.numel())
fp32_grad_tensor.copy_(grad_buffer.float())

# free the gradient
if not get_accelerator().is_synchronized_device():
Expand Down Expand Up @@ -2101,7 +2092,7 @@ def step(self, closure=None):
return

norm_groups = self._get_norm_groups()
scaled_global_grad_norm = torch.linalg.norm(torch.stack(norm_groups))
scaled_global_grad_norm = torch.linalg.vector_norm(torch.stack(norm_groups))

# Stash unscaled gradient norm
self._global_grad_norm = scaled_global_grad_norm / self.loss_scale
Expand Down Expand Up @@ -2661,11 +2652,9 @@ def _rigid_load_state_dict(self, state_dict, load_optimizer_states=True):
self.optimizer.load_state_dict(state_dict[OPTIMIZER_STATE_DICT])
self._clear_fp32_optimizer_param_groups()

if self.swap_optimizer or self.params_in_nvme_and_cpu:
if self.swap_optimizer:
# Purge the swapped optimizer state, it was initialized to the freshly created model and not the checkpoint
for swap_info in self.optimizer_swapper.swap_params_info.values():
swap_info.tensors = [swap_info.tensors[0]]
swap_info.has_state_tensors = False
self.optimizer_swapper.purge_state()

if self.swap_optimizer:
# Touch all parameters to synchronize all buffers
Expand Down Expand Up @@ -2782,11 +2771,9 @@ def load_hp_checkpoint_state_from_checkpoint_dir_stage3(self, checkpoint_dir, pa
else:
optim_sd[OPTIMIZER_STATE_DICT]['state'][0][key] = key_tensor

if self.swap_optimizer or self.params_in_nvme_and_cpu:
if self.swap_optimizer:
# Purge the swapped optimizer state, it was initialized to the freshly created model and not the checkpoint
for swap_info in self.optimizer_swapper.swap_params_info.values():
swap_info.tensors = [swap_info.tensors[0]]
swap_info.has_state_tensors = False
self.optimizer_swapper.purge_state()

if self.swap_optimizer:
# Touch all parameters to synchronize all buffers
Expand Down
Loading

0 comments on commit ba634fb

Please sign in to comment.