Skip to content

Commit

Permalink
Fix: fix importing tensorflow data type in pytorch hook issue (#654)
Browse files Browse the repository at this point in the history
* fix tf 212 and coe minor issues

* version bump
  • Loading branch information
yl-to authored Mar 14, 2023
1 parent 1af0e2f commit 471af33
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 58 deletions.
2 changes: 1 addition & 1 deletion smdebug/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.0.30"
__version__ = "1.0.31"
3 changes: 0 additions & 3 deletions smdebug/core/tfevent/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Third Party
import numpy as np
from tensorflow.python.lib.core import _pywrap_float8

# First Party
from smdebug.core.logger import get_logger
Expand Down Expand Up @@ -35,8 +34,6 @@
np.dtype([("qint16", "<i2")]): "DT_QINT16",
np.dtype([("quint16", "<u2")]): "DT_UINT16",
np.dtype([("qint32", "<i4")]): "DT_INT32",
np.dtype(_pywrap_float8.TF_float8_e5m2_type()): "DT_FLOAT8_E5M2",
np.dtype(_pywrap_float8.TF_float8_e4m3fn_type()): "DT_FLOAT8_E4M3FN",
}


Expand Down
105 changes: 52 additions & 53 deletions smdebug/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ class FRAMEWORK(Enum):
_smddp_tf_imported = None
_smddp_pt_imported = None
_is_using_smmodelparallel = None
_smp_imported = None


if check_smmodelparallel_training():
try:
import smdistributed.modelparallel.torch as smp

_smp_imported = smp
except (ImportError, ModuleNotFoundError):
_smp_imported = None
except Exception as e:
raise SMDebugError(e)


try:
Expand Down Expand Up @@ -94,6 +82,58 @@ class FRAMEWORK(Enum):
) # set up error handler to wrap smdebug functions


def check_smmodelparallel_training():
"""
The function checks whether the current job is using model parallel strategy.
For the training job that uses smmodelparallel, SageMaker sets following environment variables.
SM_HPS=
{
"mp_parameters": {
"ddp": true,
"microbatches": 4,
"optimize": "speed",
"partitions": 2,
"pipeline": "interleaved",
"placement_strategy": "spread"
}
}
The 'partitions' variable is a required parameter for scheduling a model parallel training job.
:return: True or False
"""
global _is_using_smmodelparallel
if _is_using_smmodelparallel is not None:
return _is_using_smmodelparallel
if os.getenv("SM_HPS") is None:
_is_using_smmodelparallel = False
else:
try:
smp_flag = json.loads(os.getenv("SM_HPS"))
if "mp_parameters" in smp_flag:
if "pipeline_parallel_degree" in smp_flag["mp_parameters"]:
_is_using_smmodelparallel = True
elif "partitions" in smp_flag["mp_parameters"]:
_is_using_smmodelparallel = True
else:
_is_using_smmodelparallel = False
else:
_is_using_smmodelparallel = False
except:
_is_using_smmodelparallel = False
return _is_using_smmodelparallel

_smp_imported = None
if check_smmodelparallel_training():
try:
import smdistributed.modelparallel.torch as smp

_smp_imported = smp
except (ImportError, ModuleNotFoundError):
_smp_imported = None
except Exception as e:
raise SMDebugError(e)


def make_numpy_array(x):
if isinstance(x, np.ndarray):
return x
Expand Down Expand Up @@ -618,47 +658,6 @@ def check_smdataparallel_env():
return _is_invoked_via_smddp


def check_smmodelparallel_training():
"""
The function checks whether the current job is using model parallel strategy.
For the training job that uses smmodelparallel, SageMaker sets following environment variables.
SM_HPS=
{
"mp_parameters": {
"ddp": true,
"microbatches": 4,
"optimize": "speed",
"partitions": 2,
"pipeline": "interleaved",
"placement_strategy": "spread"
}
}
The 'partitions' variable is a required parameter for scheduling a model parallel training job.
:return: True or False
"""
global _is_using_smmodelparallel
if _is_using_smmodelparallel is not None:
return _is_using_smmodelparallel
if os.getenv("SM_HPS") is None:
_is_using_smmodelparallel = False
else:
try:
smp_flag = json.loads(os.getenv("SM_HPS"))
if "mp_parameters" in smp_flag:
if "pipeline_parallel_degree" in smp_flag["mp_parameters"]:
_is_using_smmodelparallel = True
elif "partitions" in smp_flag["mp_parameters"]:
_is_using_smmodelparallel = True
else:
_is_using_smmodelparallel = False
else:
_is_using_smmodelparallel = False
except:
_is_using_smmodelparallel = False
return _is_using_smmodelparallel


# we need to compute the output of this fn only once since the framework type will remain constant during execution
@lru_cache(maxsize=None)
def is_framework_version_supported(framework_type):
Expand Down
8 changes: 7 additions & 1 deletion tests/tensorflow2/test_tensorflow2_datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@ def test_tensorflow2_datatypes():
# _NP_TO_TF contains all the mappings
# of numpy to tf types
try:
from tensorflow.python.lib.core import _pywrap_bfloat16
from tensorflow.python.lib.core import _pywrap_bfloat16, _pywrap_float8

# TF 2.x.x Implements a Custom Numpy Datatype for Brain Floating Type
# Which is currently only supported on TPUs
_np_bfloat16 = _pywrap_bfloat16.TF_bfloat16_type()
_NP_TO_TF.pop(_np_bfloat16)

# TF 2.12 is adding float8, but currently not available in numpy
_np_float8_e5m2 = _pywrap_float8.TF_float8_e5m2_type()
_np_float8_e4m3 = _pywrap_float8.TF_float8_e4m3fn_type()
_NP_TO_TF.pop(_np_float8_e5m2)
_NP_TO_TF.pop(_np_float8_e4m3)
except (ModuleNotFoundError, ValueError, ImportError):
pass

Expand Down

0 comments on commit 471af33

Please sign in to comment.