Skip to content

Commit

Permalink
move horovod imports and create new profiling module
Browse files Browse the repository at this point in the history
  • Loading branch information
jarlsondre committed Oct 16, 2024
1 parent 7505e88 commit 246b916
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
3 changes: 1 addition & 2 deletions src/itwinai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def generate_communication_plot(
) -> None:
"""Generate stacked plot showing computation vs. communication fraction. Stores it
Args:
log_dir: The directory where the csv logs are stored. Defauls to
``profiling_logs``.
Expand All @@ -38,7 +37,7 @@ def generate_communication_plot(
"""
import matplotlib.pyplot as plt

from itwinai.torch.communication_plot import (
from itwinai.torch.profiling.communication_plot import (
create_combined_comm_overhead_df,
create_stacked_plot,
get_comp_fraction_full_array,
Expand Down
39 changes: 21 additions & 18 deletions src/itwinai/torch/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
from typing import Any, Iterable, List, Literal, Optional, Tuple, Union

import horovod.torch as hvd
import torch
import torch.distributed as dist
import torch.nn as nn
Expand Down Expand Up @@ -578,6 +577,7 @@ def init(self) -> None:
already initialized.
"""
import deepspeed
self.deepspeed = deepspeed
if not distributed_resources_available():
raise RuntimeError(
"Trying to run distributed on insufficient resources.")
Expand All @@ -592,7 +592,7 @@ def init(self) -> None:
)

# https://deepspeed.readthedocs.io/en/latest/initialize.html#training-initialization
deepspeed.init_distributed(dist_backend=self.backend)
self.deepspeed.init_distributed(dist_backend=self.backend)
self.is_initialized = True

self.set_device()
Expand All @@ -608,9 +608,8 @@ def distributed(
raise UninitializedStrategyError(
"Strategy has not been initialized. Use the init method."
)
import deepspeed

distrib_model, optimizer, _, lr_scheduler = deepspeed.initialize(
distrib_model, optimizer, _, lr_scheduler = self.deepspeed.initialize(
model=model,
model_parameters=model_parameters,
optimizer=optimizer,
Expand Down Expand Up @@ -752,7 +751,11 @@ def init(self) -> None:
"Trying to run distributed on insufficient resources.")
if self.is_initialized:
raise DistributedStrategyError("Strategy was already initialized")
hvd.init()

import horovod.torch as hvd
self.hvd = hvd

self.hvd.init()
self.is_initialized = True

self.set_device()
Expand All @@ -772,16 +775,16 @@ def distributed(
# Scale learning rate
# https://github.com/horovod/horovod/issues/1653#issuecomment-574764452
lr_scaler = 1
if optim_kwargs.get('op') == hvd.Adasum:
lr_scaler = hvd.local_size()
elif optim_kwargs.get('op') == hvd.Average:
lr_scaler = hvd.size()
if optim_kwargs.get('op') == self.hvd.Adasum:
lr_scaler = self.hvd.local_size()
elif optim_kwargs.get('op') == self.hvd.Average:
lr_scaler = self.hvd.size()
for g in optimizer.param_groups:
g['lr'] *= lr_scaler

self._broadcast_params(model, optimizer)

distOptimizer = hvd.DistributedOptimizer(
distOptimizer = self.hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
**optim_kwargs
Expand All @@ -799,8 +802,8 @@ def _broadcast_params(
optimizer (optim.Optimizer): Optimizer that is to be broadcasted
across processes.
"""
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=-0)
self.hvd.broadcast_parameters(model.state_dict(), root_rank=0)
self.hvd.broadcast_optimizer_state(optimizer, root_rank=-0)

def global_world_size(self) -> int:
"""Returns the total number of processes (global world size).
Expand All @@ -811,7 +814,7 @@ def global_world_size(self) -> int:
if not self.is_initialized:
raise UninitializedStrategyError(
"Strategy has not been initialized. Use the init method.")
return hvd.size()
return self.hvd.size()

def local_world_size(self) -> int:
"""Returns the local number of workers available per node,
Expand All @@ -823,7 +826,7 @@ def local_world_size(self) -> int:
if not self.is_initialized:
raise UninitializedStrategyError(
"Strategy has not been initialized. Use the init method.")
return hvd.local_size()
return self.hvd.local_size()

def global_rank(self) -> int:
"""Returns the global rank of the current process, where
Expand All @@ -835,7 +838,7 @@ def global_rank(self) -> int:
if not self.is_initialized:
raise UninitializedStrategyError(
"Strategy has not been initialized. Use the init method.")
return hvd.rank()
return self.hvd.rank()

def local_rank(self) -> int:
"""Returns the local rank of the current process.
Expand All @@ -846,14 +849,14 @@ def local_rank(self) -> int:
if not self.is_initialized:
raise UninitializedStrategyError(
"Strategy has not been initialized. Use the init method.")
return hvd.local_rank()
return self.hvd.local_rank()

def clean_up(self) -> None:
"""Shuts Horovod down."""
if not self.is_initialized:
raise UninitializedStrategyError(
"Strategy has not been initialized. Use the init method.")
hvd.shutdown()
self.hvd.shutdown()

def allgather_obj(self, obj: Any) -> list[Any]:
"""All-gathers scalar objects across all workers to a
Expand All @@ -869,7 +872,7 @@ def allgather_obj(self, obj: Any) -> list[Any]:
raise UninitializedStrategyError(
"Strategy has not been initialized. Use the init method."
)
return hvd.allgather_object(obj)
return self.hvd.allgather_object(obj)

def gather_obj(self, obj: Any, dst_rank: int = 0) -> list[Any]:
"""The same as ``allgather_obj``, as gather is not supported
Expand Down
Binary file added use-cases/eurac/plots/comm_plot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion use-cases/eurac/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from itwinai.torch.trainer import TorchTrainer
from itwinai.torch.type import Metric
from itwinai.torch.profiler import profile_torch_trainer
from itwinai.torch.profiling.profiler import profile_torch_trainer

class RNNDistributedTrainer(TorchTrainer):
"""Trainer class for RNN model using pytorch.
Expand Down

0 comments on commit 246b916

Please sign in to comment.