Skip to content

Commit

Permalink
Merge branch 'master' into gma/add_autotp_workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
loadams authored Feb 15, 2024
2 parents 092d007 + 2d0a6bc commit 4560381
Show file tree
Hide file tree
Showing 43 changed files with 574 additions and 249 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/nv-accelerate-v100.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:

- name: Install pytorch
run: |
pip install -U --cache-dir $TORCH_CACHE torch==2.1.2 torchvision==0.16.2 --index-url https://download.pytorch.org/whl/cu118
pip install -U --cache-dir $TORCH_CACHE torch torchvision --index-url https://download.pytorch.org/whl/cu118
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/nv-inference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ jobs:
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
pytest $PYTEST_OPTS -m 'seq_inference' unit/ --torch_ver="1.13" --cuda_ver="11.6"
#pytest $PYTEST_OPTS -m 'seq_inference' unit/ --torch_ver="1.13" --cuda_ver="11.6"
pytest $PYTEST_OPTS -m 'inference_ops' unit/ --torch_ver="1.13" --cuda_ver="11.6"
pytest $PYTEST_OPTS --forked -n 4 -m 'inference' unit/ --torch_ver="1.13" --cuda_ver="11.6"
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ docs/code-docs/build
## Testing data
# Saved checkpoints for testing
tests/unit/saved_checkpoint/

# HIP files created during AMD compilation
*_hip.cpp
*_hip.h
*.hip
*.cuh
*hip_layers.h
12 changes: 12 additions & 0 deletions accelerator/abstract_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ def __init__(self):
def is_synchronized_device(self):
...

@abc.abstractmethod
def use_host_timers(self):
...

@abc.abstractmethod
def resolves_data_dependency(self):
...

@abc.abstractmethod
def handles_memory_backpressure(self):
...

# Device APIs
@abc.abstractmethod
def device_name(self, device_index):
Expand Down
9 changes: 9 additions & 0 deletions accelerator/cpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ def __init__(self):
def is_synchronized_device(self):
return True

def use_host_timers(self):
return self.is_synchronized_device()

def resolves_data_dependency(self):
return self.is_synchronized_device()

def handles_memory_backpressure(self):
return self.is_synchronized_device()

# Device APIs
def device_name(self, device_index=None):
return 'cpu'
Expand Down
9 changes: 9 additions & 0 deletions accelerator/cuda_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ def _init_pynvml(self):
def is_synchronized_device(self):
return False

def use_host_timers(self):
return self.is_synchronized_device()

def resolves_data_dependency(self):
return self.is_synchronized_device()

def handles_memory_backpressure(self):
return self.is_synchronized_device()

# Device APIs
def device_name(self, device_index=None):
if device_index is None:
Expand Down
11 changes: 10 additions & 1 deletion accelerator/hpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ def __init__(self):
def is_synchronized_device(self):
return False

def use_host_timers(self):
return False

def resolves_data_dependency(self):
return True

def handles_memory_backpressure(self):
return True

def device_name(self, device_index=None):
if device_index is None:
return 'hpu'
Expand Down Expand Up @@ -147,7 +156,7 @@ def is_fp16_supported(self):
def supported_dtypes(self):
supported_dtypes = [torch.float, torch.bfloat16]
if self.is_fp16_supported():
supported_dtypes.append(torch.bfloat16)
supported_dtypes.append(torch.half)
return supported_dtypes

# Misc
Expand Down
9 changes: 9 additions & 0 deletions accelerator/mps_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ def __init__(self):
def is_synchronized_device(self):
return False

def use_host_timers(self):
return self.is_synchronized_device()

def resolves_data_dependency(self):
return self.is_synchronized_device()

def handles_memory_backpressure(self):
return self.is_synchronized_device()

# Device APIs
def device_name(self, device_index=None):
if device_index is None:
Expand Down
9 changes: 9 additions & 0 deletions accelerator/npu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def __init__(self):
def is_synchronized_device(self):
return False

def use_host_timers(self):
return self.is_synchronized_device()

def resolves_data_dependency(self):
return self.is_synchronized_device()

def handles_memory_backpressure(self):
return self.is_synchronized_device()

# Device APIs
def device_name(self, device_index=None):
if device_index is None:
Expand Down
9 changes: 9 additions & 0 deletions accelerator/xpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ def __init__(self):
def is_synchronized_device(self):
return False

def use_host_timers(self):
return self.is_synchronized_device()

def resolves_data_dependency(self):
return self.is_synchronized_device()

def handles_memory_backpressure(self):
return self.is_synchronized_device()

# Device APIs
def device_name(self, device_index=None):
if device_index == None:
Expand Down
4 changes: 4 additions & 0 deletions csrc/aio/common/deepspeed_aio_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ void report_file_error(const char* filename, const std::string file_op, const in
int open_file(const char* filename, const bool read_op)
{
const int flags = read_op ? (O_RDONLY | O_DIRECT) : (O_WRONLY | O_CREAT | O_DIRECT);
#if defined(__ENABLE_CANN__)
int* flags_ptr = (int*)&flags;
*flags_ptr = read_op ? (O_RDONLY) : (O_WRONLY | O_CREAT);
#endif
const int mode = 0600;
const auto fd = open(filename, flags, mode);
if (fd == -1) {
Expand Down
12 changes: 8 additions & 4 deletions deepspeed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
from torch.optim.lr_scheduler import _LRScheduler
from packaging import version as pkg_version

try:
import triton # noqa: F401 # type: ignore
HAS_TRITON = True
except ImportError:
# Skip Triton import for AMD due to pytorch-triton-rocm module breaking device API in DeepSpeed
if not (hasattr(torch.version, 'hip') and torch.version.hip is not None):
try:
import triton # noqa: F401 # type: ignore
HAS_TRITON = True
except ImportError:
HAS_TRITON = False
else:
HAS_TRITON = False

from . import ops
Expand Down
22 changes: 16 additions & 6 deletions deepspeed/inference/v2/allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,27 @@

from functools import reduce
from typing import Iterable

from collections import defaultdict
import torch

from deepspeed.accelerator import get_accelerator


def empty_from(tensor: torch.Tensor, shape: Iterable[int]) -> torch.Tensor:
shape_size = reduce(lambda x, y: x * y, shape)
if shape_size == 0:
raise ValueError("Cannot create empty tensor with size 0")
return tensor.flatten()[:shape_size].view(shape)
class Allocator:
cache = defaultdict(dict)

def empty_from(tensor: torch.Tensor, shape: Iterable[int]) -> torch.Tensor:
try:
return Allocator.cache[tensor][shape]
except KeyError:
shape_size = reduce(lambda x, y: x * y, shape)
if shape_size == 0:
raise ValueError("Cannot create empty tensor with size 0")
Allocator.cache[tensor][shape] = tensor.flatten()[:shape_size].view(shape)
return Allocator.cache[tensor][shape]


empty_from = Allocator.empty_from


def on_device(method) -> torch.Tensor:
Expand Down
89 changes: 67 additions & 22 deletions deepspeed/runtime/bf16_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
align_dense_tensors, all_gather_dp_groups, bwc_tensor_model_parallel_rank,
is_model_parallel_parameter, see_memory_usage, graph_process)

from deepspeed.utils import link_hp_params, fragment_address
from deepspeed.utils import link_hp_params, lazy_init_hp_params_optimizer_state, fragment_address
from deepspeed.checkpoint import enable_universal_checkpoint
from deepspeed.checkpoint.constants import (DS_VERSION, PARTITION_COUNT, BASE_OPTIMIZER_STATE,
SINGLE_PARTITION_OF_FP32_GROUPS, CLIP_GRAD, GROUP_PADDINGS,
Expand All @@ -39,7 +39,8 @@ def __init__(self,
dp_process_group=None,
timers=None,
grad_acc_dtype=None,
graph_harvesting=False):
graph_harvesting=False,
immediate_grad_update=False):
super().__init__()
see_memory_usage('begin bf16_optimizer', force=True)
self.timers = timers
Expand All @@ -50,6 +51,7 @@ def __init__(self,
assert grad_acc_dtype in [torch.float32, torch.bfloat16
], f"BF16Optimizer: Unsupported gradient accumulation data type: {grad_acc_dtype}"
self.grad_acc_dtype = grad_acc_dtype
self.immediate_grad_update = immediate_grad_update

self.clip_grad = clip_grad
self.norm_type = norm_type
Expand Down Expand Up @@ -163,8 +165,12 @@ def _setup_for_real_optimizer(self):
self.initialize_optimizer_states()
see_memory_usage('end initialize_optimizer', force=True)

if self.immediate_grad_update:
self.create_grad_acc_hooks()

# Need optimizer states initialized before linking lp to optimizer state
self._link_all_hp_params()
self._hp_optimizer_states_linked = False
self._enable_universal_checkpoint()
self._param_slice_mappings = self._create_param_mapping()

Expand Down Expand Up @@ -199,9 +205,15 @@ def _link_all_hp_params(self):
param_group_index=i,
partition_start=partition_id * partition_size,
partition_size=partition_size,
partition_optimizer_state=self.optimizer.state[flat_hp_partition],
dp_group=self.real_dp_process_group[i])

def _lazy_init_hp_params_optimizer_state(self):
if not self._hp_optimizer_states_linked:
for i, _ in enumerate(self.optimizer.param_groups):
lazy_init_hp_params_optimizer_state(self.bf16_groups[i], self.fp32_groups_flat_partition[i],
self.optimizer.state)
self._hp_optimizer_states_linked = True

def initialize_optimizer_states(self):
"""Take an optimizer step with zero-valued gradients to allocate internal
optimizer state.
Expand All @@ -215,8 +227,6 @@ def initialize_optimizer_states(self):
param_partition.grad = grad_partition.to(
param_partition.dtype) if grad_partition.dtype != param_partition.dtype else grad_partition

self.optimizer.step()

if self.grad_acc_dtype is not torch.float32:
for param_partition in self.fp32_groups_flat_partition:
param_partition.grad = None
Expand Down Expand Up @@ -263,6 +273,9 @@ def step(self, closure=None):

self.optimizer.step()

# We need to link optimizer state after the first step() call
self._lazy_init_hp_params_optimizer_state()

self.update_lp_params()

self.clear_hp_grads()
Expand All @@ -283,27 +296,37 @@ def backward(self, loss, update_hp_grads=True, clear_lp_grads=False, **bwd_kwarg
self.update_hp_grads(clear_lp_grads=clear_lp_grads)

@torch.no_grad()
def update_hp_grads(self, clear_lp_grads=False):
def _update_hp_grad(self, lp, group_idx, param_idx, clear_lp_grads):
if lp.grad is None:
return

def _update_hp_grads_func(clear_lp_grads=False):
for i, group in enumerate(self.bf16_groups):
for j, lp in enumerate(group):
if lp.grad is None:
continue
hp_grad = self.fp32_groups_gradients[i][j]
assert hp_grad is not None, \
f'high precision param has no gradient, lp param_id = {id(lp)} group_info = [{i}][{j}]'
hp_grad.data.add_(lp.grad.data.to(hp_grad.dtype).view(hp_grad.shape))
lp._hp_grad = hp_grad
self.fp32_groups_has_gradients[i][j] = True
# clear gradients
if clear_lp_grads:
lp.grad._zero()
hp_grad = self.fp32_groups_gradients[group_idx][param_idx]
assert hp_grad is not None, \
f'high precision param has no gradient, lp param_id = {id(lp)} group_info = [{group_idx}][{param_idx}]'

hp_grad.data.add_(lp.grad.data.to(hp_grad.dtype).view(hp_grad.shape))
lp._hp_grad = hp_grad
self.fp32_groups_has_gradients[group_idx][param_idx] = True

# clear gradients
if clear_lp_grads:
lp.grad._zero()

@torch.no_grad()
def _update_hp_grads_func(self, clear_lp_grads=False):
for i, group in enumerate(self.bf16_groups):
for j, lp in enumerate(group):
self._update_hp_grad(lp, i, j, clear_lp_grads)

@torch.no_grad()
def update_hp_grads(self, clear_lp_grads=False):
if self.immediate_grad_update:
return

if self.graph_harvesting:
graph_process(False, _update_hp_grads_func, clear_lp_grads)
graph_process(False, self._update_hp_grads_func, clear_lp_grads)
else:
_update_hp_grads_func(clear_lp_grads)
self._update_hp_grads_func(clear_lp_grads)
#cpu op
for i, group in enumerate(self.bf16_groups):
for j, lp in enumerate(group):
Expand Down Expand Up @@ -441,6 +464,28 @@ def _load_hp_checkpoint_state(self, checkpoint_dir):
lp.load_hp_checkpoint_state(os.path.join(checkpoint_dir, self.param_names[lp]), tp_rank,
tp_world_size)

def accumulate_hp_grads_and_remove_lp(self, lp_param, group_idx, param_idx):
assert self.immediate_grad_update
self._update_hp_grad(lp_param, group_idx, param_idx, clear_lp_grads=False)

def create_grad_acc_hooks(self):
self.grad_accs = []
for i, param_group in enumerate(self.bf16_groups):
for j, param in enumerate(param_group):
if param.requires_grad:

def wrapper(param, i, j):
param_tmp = param.expand_as(param)
grad_acc = param_tmp.grad_fn.next_functions[0][0]

def accumulate_hp_grads_and_remove_lp(*notneeded):
self.accumulate_hp_grads_and_remove_lp(param, i, j)

grad_acc.register_hook(accumulate_hp_grads_and_remove_lp)
self.grad_accs.append(grad_acc)

wrapper(param, i, j)


def _get_padded_tensor(src_tensor, size):
if src_tensor.numel() >= size:
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/runtime/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


def is_compile_supported():
return hasattr(torch, "compile")
return hasattr(torch, "compiler")


def disable(func):
Expand Down
Loading

0 comments on commit 4560381

Please sign in to comment.