Skip to content

Commit

Permalink
support system allocated memory
Browse files Browse the repository at this point in the history
Signed-off-by: Rong Ou <[email protected]>
  • Loading branch information
rongou committed Aug 5, 2024
1 parent 5196608 commit 9221c96
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 7 deletions.
8 changes: 5 additions & 3 deletions docs/site/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ nav_order: 6

The following configurations can be supplied as Spark properties.

| Property name | Default | Meaning |
| :-------------- | :------ | :------- |
| spark.rapids.ml.uvm.enabled | false | if set to true, enables [CUDA unified virtual memory](https://developer.nvidia.com/blog/unified-memory-cuda-beginners/) (aka managed memory) during estimator.fit() operations to allow processing of larger datasets than would fit in GPU memory|
| Property name | Default | Meaning |
|:-------------------------------|:--------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.rapids.ml.uvm.enabled | false | if set to true, enables [CUDA unified virtual memory](https://developer.nvidia.com/blog/unified-memory-cuda-beginners/) (aka managed memory) during estimator.fit() operations to allow processing of larger datasets than would fit in GPU memory |
| spark.rapids.ml.sam.enabled | false | if set to true, enables System Allocated Memory (SAM) on [HMM](https://developer.nvidia.com/blog/simplifying-gpu-application-development-with-heterogeneous-memory-management/) or [ATS](https://developer.nvidia.com/blog/nvidia-grace-hopper-superchip-architecture-in-depth/) systems during estimator.fit() operations to allow processing of larger datasets than would fit in GPU memory |
| spark.rapids.ml.sam.headroom | None | when using System Allocated Memory (SAM) and GPU memory is oversubscribed, we may need to reserve some GPU memory as headroom to allow other CUDA calls to function without running out memory |

5 changes: 4 additions & 1 deletion python/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ cat <<EOF
--spark_confs spark.python.worker.reuse=true \
--spark_confs spark.master=local[$local_threads] \
--spark_confs spark.driver.memory=128g \
--spark_confs spark.rapids.ml.uvm.enabled=true
--spark_confs spark.rapids.ml.uvm.enabled=false \
--spark_confs spark.rapids.ml.sam.enabled=true \
--spark_confs spark.rapids.ml.sam.headroom=8g \
--spark_confs spark.executorEnv.CUPY_ENABLE_SAM=1
EOF
)

Expand Down
9 changes: 8 additions & 1 deletion python/src/spark_rapids_ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,13 @@ def _get_cuml_fit_func(
_get_spark_session().conf.get("spark.rapids.ml.uvm.enabled", "false")
== "true"
)
cuda_system_mem_enabled = (
_get_spark_session().conf.get("spark.rapids.ml.sam.enabled", "false")
== "true"
)
if cuda_managed_mem_enabled and cuda_system_mem_enabled:
raise ValueError("Both CUDA managed memory and system allocated memory cannot be enabled at the same time.")
cuda_system_mem_headroom = _get_spark_session().conf.get("spark.rapids.ml.sam.headroom", None)

idCol_bc = self.idCols_
raw_data_bc = self.raw_data_
Expand All @@ -957,7 +964,7 @@ def _cuml_fit(

# experiments indicate it is faster to convert to numpy array and then to cupy array than directly
# invoking cupy array on the list
if cuda_managed_mem_enabled:
if cuda_managed_mem_enabled or cuda_system_mem_enabled:
features = cp.array(features)

inputs.append(features)
Expand Down
47 changes: 46 additions & 1 deletion python/src/spark_rapids_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,18 @@ def _call_cuml_fit_func(
if cuda_managed_mem_enabled:
get_logger(cls).info("CUDA managed memory enabled.")

cuda_system_mem_enabled = (
_get_spark_session().conf.get("spark.rapids.ml.sam.enabled", "false")
== "true"
)
if cuda_managed_mem_enabled and cuda_system_mem_enabled:
raise ValueError("Both CUDA managed memory and system allocated memory cannot be enabled at the same time.")
if cuda_system_mem_enabled:
get_logger(cls).info("CUDA system allocated memory enabled.")
cuda_system_mem_headroom = _get_spark_session().conf.get("spark.rapids.ml.sam.headroom", None)
if cuda_system_mem_headroom is not None:
get_logger(cls).info(f"CUDA system allocated memory headroom set to {cuda_system_mem_headroom}.")

# parameters passed to subclass
params: Dict[str, Any] = {
param_alias.cuml_init: self.cuml_params,
Expand Down Expand Up @@ -716,6 +728,16 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
devices=_CumlCommon._get_gpu_device(context, is_local),
)
cp.cuda.set_allocator(rmm_cupy_allocator)
if cuda_system_mem_enabled:
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

if cuda_system_mem_headroom is None:
mr = rmm.mr.SystemMemoryResource()
else:
mr = rmm.mr.SamHeadroomMemoryResource(headroom=cuda_system_mem_headroom)
rmm.mr.set_current_device_resource(mr)
cp.cuda.set_allocator(rmm_cupy_allocator)

_CumlCommon._initialize_cuml_logging(cuml_verbose)

Expand All @@ -738,7 +760,7 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:

# experiments indicate it is faster to convert to numpy array and then to cupy array than directly
# invoking cupy array on the list
if cuda_managed_mem_enabled:
if cuda_managed_mem_enabled or cuda_system_mem_enabled:
features = (
cp.array(features)
if use_sparse_array is False
Expand Down Expand Up @@ -1356,6 +1378,18 @@ def _transform_evaluate_internal(
if cuda_managed_mem_enabled:
get_logger(self.__class__).info("CUDA managed memory enabled.")

cuda_system_mem_enabled = (
_get_spark_session().conf.get("spark.rapids.ml.sam.enabled", "false")
== "true"
)
if cuda_managed_mem_enabled and cuda_system_mem_enabled:
raise ValueError("Both CUDA managed memory and system allocated memory cannot be enabled at the same time.")
if cuda_system_mem_enabled:
get_logger(self.__class__).info("CUDA system allocated memory enabled.")
cuda_system_mem_headroom = _get_spark_session().conf.get("spark.rapids.ml.sam.headroom", None)
if cuda_system_mem_headroom is not None:
get_logger(self.__class__).info(f"CUDA system allocated memory headroom set to {cuda_system_mem_headroom}.")

def _transform_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
from pyspark import TaskContext

Expand All @@ -1375,6 +1409,17 @@ def _transform_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
),
)
cp.cuda.set_allocator(rmm_cupy_allocator)
if cuda_system_mem_enabled:
import cupy as cp
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

if cuda_system_mem_headroom is None:
mr = rmm.mr.SystemMemoryResource()
else:
mr = rmm.mr.SamHeadroomMemoryResource(headroom=cuda_system_mem_headroom)
rmm.mr.set_current_device_resource(mr)
cp.cuda.set_allocator(rmm_cupy_allocator)

# Construct the cuml counterpart object
cuml_instance = construct_cuml_object_func()
Expand Down
24 changes: 23 additions & 1 deletion python/src/spark_rapids_ml/umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,18 @@ def _call_cuml_fit_func_dataframe(
if cuda_managed_mem_enabled:
get_logger(cls).info("CUDA managed memory enabled.")

cuda_system_mem_enabled = (
_get_spark_session().conf.get("spark.rapids.ml.sam.enabled", "false")
== "true"
)
if cuda_managed_mem_enabled and cuda_system_mem_enabled:
raise ValueError("Both CUDA managed memory and system allocated memory cannot be enabled at the same time.")
if cuda_system_mem_enabled:
get_logger(cls).info("CUDA system allocated memory enabled.")
cuda_system_mem_headroom = _get_spark_session().conf.get("spark.rapids.ml.sam.headroom", None)
if cuda_system_mem_headroom is not None:
get_logger(cls).info(f"CUDA system allocated memory headroom set to {cuda_system_mem_headroom}.")

# parameters passed to subclass
params: Dict[str, Any] = {
param_alias.cuml_init: self.cuml_params,
Expand Down Expand Up @@ -1021,6 +1033,16 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:

rmm.reinitialize(managed_memory=True)
cp.cuda.set_allocator(rmm_cupy_allocator)
if cuda_system_mem_enabled:
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

if cuda_system_mem_headroom is None:
mr = rmm.mr.SystemMemoryResource()
else:
mr = rmm.mr.SamHeadroomMemoryResource(headroom=cuda_system_mem_headroom)
rmm.mr.set_current_device_resource(mr)
cp.cuda.set_allocator(rmm_cupy_allocator)

_CumlCommon._initialize_cuml_logging(cuml_verbose)

Expand All @@ -1042,7 +1064,7 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
features = np.array(list(pdf[alias.data]), order=array_order)
# experiments indicate it is faster to convert to numpy array and then to cupy array than directly
# invoking cupy array on the list
if cuda_managed_mem_enabled:
if cuda_managed_mem_enabled or cuda_system_mem_enabled:
features = cp.array(features)

label = pdf[alias.label] if alias.label in pdf.columns else None
Expand Down

0 comments on commit 9221c96

Please sign in to comment.