Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
parthraut committed Dec 13, 2024
1 parent 2f41596 commit 63f7fe9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
14 changes: 8 additions & 6 deletions zeus/optimizer/power_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class GlobalPowerLimitOptimizer(Callback):
This optimizer uses the JIT profiling log to determine the optimal power limit.
## Usage with distributed data parallelism
The global power limit optimizer expects one process to control each GPU used for training.
For instance, `torchrun` will automatically spawn one process for each GPU on the node.
Correspondingly, the [`ZeusMonitor`][zeus.monitor.energy.ZeusMonitor] instance passed in
Expand All @@ -213,8 +213,8 @@ class GlobalPowerLimitOptimizer(Callback):
called `torch.cuda.set_device` early on, so `torch.cuda.current_device` will give you the GPU index.
`GlobalPowerLimitOptimizer` will internally do an AllReduce across all GPUs to aggregate
time and energy measurements, and then select the globally optimal power limit.
```python
monitor = ZeusMonitor(gpu_indices=[local_rank]) # pass in local rank to gpu_indices.
plo = GlobalPowerLimitOptimizer(monitor)
Expand Down Expand Up @@ -419,9 +419,11 @@ def on_step_begin(self) -> None:
self.measurements.append(
PowerLimitMeasurement(
power_limit=self.state.current_power_limit // 1000,
energy=sum(all_reduce(
list(measurement.gpu_energy.values()), operation="sum"
)),
energy=sum(
all_reduce(
list(measurement.gpu_energy.values()), operation="sum"
)
),
time=max(all_reduce([measurement.time], operation="max")),
)
)
Expand Down
12 changes: 7 additions & 5 deletions zeus/utils/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,17 @@ def all_reduce(
object: list[int] | list[float], operation: Literal["sum", "max"]
) -> list[int] | list[float]:
"""Reduce objects from all replicas through the specified operation.
If the current execution is not distributed, the object is returned as is."""
If the current execution is not distributed, the object is returned as is.
"""
if torch_is_available(ensure_cuda=False):
torch = MODULE_CACHE["torch"]

# if torch.distributed is not available or not initialized, return the object as is
if not torch.distributed.is_available() or not torch.distributed.is_initialized():
if (
not torch.distributed.is_available()
or not torch.distributed.is_initialized()
):
return object

# wrap object in a tensor if it is not already
Expand All @@ -123,10 +127,8 @@ def all_reduce(

# determine operation
if operation == "sum":
torch_func = torch.sum
torch_op = torch.distributed.ReduceOp.SUM
elif operation == "max":
torch_func = torch.max
torch_op = torch.distributed.ReduceOp.MAX
else:
raise ValueError(f"all_reduce unsupported operation: {operation}")
Expand Down

0 comments on commit 63f7fe9

Please sign in to comment.