From e431609341d52724f6db2f987f20344f6bcd35b7 Mon Sep 17 00:00:00 2001 From: Neelesh Dodda Date: Fri, 15 Jan 2021 16:45:41 -0800 Subject: [PATCH] Revert "Use SMP rank and size when applicable (#411)" (#424) This reverts commit 07a3fd931b61256746cd59a454f1753cda215d09. Co-authored-by: Nihal Harish --- smdebug/core/utils.py | 103 +++++++++++--------------------- smdebug/tensorflow/base_hook.py | 24 +------- 2 files changed, 38 insertions(+), 89 deletions(-) diff --git a/smdebug/core/utils.py b/smdebug/core/utils.py index 56412069c..5081ab680 100644 --- a/smdebug/core/utils.py +++ b/smdebug/core/utils.py @@ -23,54 +23,6 @@ from smdebug.exceptions import IndexReaderException _is_invoked_via_smddp = None - -try: - import smdistributed.modelparallel.tensorflow as smp - - _smp_imported = smp -except (ImportError, ModuleNotFoundError): - try: - import smdistributed.modelparallel.torch as smp - - _smp_imported = smp - except (ImportError, ModuleNotFoundError): - _smp_imported = None - - -try: - import torch.distributed as dist - - _torch_dist_imported = dist -except (ImportError, ModuleNotFoundError): - _torch_dist_imported = None - - -try: - import horovod.torch as hvd - - _hvd_imported = hvd -except (ModuleNotFoundError, ImportError): - try: - import horovod.tensorflow as hvd - - _hvd_imported = hvd - except (ModuleNotFoundError, ImportError): - _hvd_imported = None - - -try: - import smdistributed.dataparallel.torch.distributed as smdataparallel - - _smdataparallel_imported = smdataparallel -except (ModuleNotFoundError, ImportError): - try: - import smdistributed.dataparallel.tensorflow as smdataparallel - - _smdataparallel_imported = smdataparallel - except (ModuleNotFoundError, ImportError): - _smdataparallel_imported = None - - logger = get_logger() @@ -365,34 +317,51 @@ def get_tb_worker(): def get_distributed_worker(): - """ - Get the rank for horovod or torch distributed. If none of them are being used, + """Get the rank for horovod or torch distributed. If none of them are being used, return None""" rank = None - if ( - _torch_dist_imported - and hasattr(_torch_dist_imported, "is_initialized") - and _torch_dist_imported.is_initialized() - ): - rank = _torch_dist_imported.get_rank() - elif _smp_imported and smp.core.initialized: - rank = smp.rank() - elif check_smdataparallel_env(): - # smdistributed.dataparallel should be invoked via `mpirun`. - # It supports EC2 machines with 8 GPUs per machine. - assert smdataparallel is not None + try: + import torch.distributed as dist + except (ImportError, ModuleNotFoundError): + dist = None + rank = None + if dist and hasattr(dist, "is_initialized") and dist.is_initialized(): + rank = dist.get_rank() + else: try: - if smdataparallel.get_world_size(): - return smdataparallel.get_rank() - except ValueError: + import horovod.torch as hvd + + if hvd.size(): + rank = hvd.rank() + except (ModuleNotFoundError, ValueError, ImportError): pass - elif _hvd_imported: + try: + import horovod.tensorflow as hvd + if hvd.size(): rank = hvd.rank() - except ValueError: + except (ModuleNotFoundError, ValueError, ImportError): pass + # smdistributed.dataparallel should be invoked via `mpirun`. + # It supports EC2 machines with 8 GPUs per machine. + if check_smdataparallel_env(): + try: + import smdistributed.dataparallel.torch.distributed as smdataparallel + + if smdataparallel.get_world_size(): + return smdataparallel.get_rank() + except (ModuleNotFoundError, ValueError, ImportError): + pass + + try: + import smdistributed.dataparallel.tensorflow as smdataparallel + + if smdataparallel.size(): + return smdataparallel.rank() + except (ModuleNotFoundError, ValueError, ImportError): + pass return rank diff --git a/smdebug/tensorflow/base_hook.py b/smdebug/tensorflow/base_hook.py index 4e262496a..64ec2d7ca 100644 --- a/smdebug/tensorflow/base_hook.py +++ b/smdebug/tensorflow/base_hook.py @@ -33,11 +33,9 @@ ) try: - import smdistributed.modelparallel.tensorflow as smp # noqa isort:skip - - _smp_importable = True + pass except ImportError: - _smp_importable = False + pass DEFAULT_INCLUDE_COLLECTIONS = [ @@ -185,15 +183,6 @@ def _get_worker_name(self) -> str: """ self._assert_distribution_strategy() if self.distribution_strategy == TFDistributionStrategy.HOROVOD: - if _smp_importable: - # when model parallel is being used, there will be multiple processes - # with same hvd rank, hence use smp.rank - import smdistributed.modelparallel.tensorflow as smp - - if smp.core.initialized: - # if smp is in use - return f"worker_{smp.rank()}" - import horovod.tensorflow as hvd return f"worker_{hvd.rank()}" @@ -271,15 +260,6 @@ def _get_custom_and_default_collections(self) -> Tuple[Set["Collection"], Set["C def _get_num_workers(self): self._assert_distribution_strategy() if self.distribution_strategy == TFDistributionStrategy.HOROVOD: - if _smp_importable: - # when model parallel is being used, there will be multiple hvd process groups, - # hence use smp.size - import smdistributed.modelparallel.tensorflow as smp - - if smp.core.initialized: - # if smp is in use - return smp.size() - import horovod.tensorflow as hvd return hvd.size()