Skip to content

Commit

Permalink
MLCOMPUTE-36 | change default cachedExecutorIdleTimeout and expose ge…
Browse files Browse the repository at this point in the history
…t_dra_configs as public (#82)

* MLCOMPUTE-36 | change default cachedExecutorIdleTimeout and expose get_dra_configs as public

* MLCOMPUTE-36 | bump up version

Co-authored-by: Sameer Sharma <[email protected]>
Co-authored-by: Sameer Sharma <[email protected]>
  • Loading branch information
3 people authored Jul 28, 2022
1 parent 5cbb07b commit 672798b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
13 changes: 10 additions & 3 deletions service_configuration_lib/spark_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
DEFAULT_CLUSTERMAN_OBSERVED_SCALING_TIME = 15 # minutes
DEFAULT_SQL_SHUFFLE_PARTITIONS = 128
DEFAULT_DRA_EXECUTOR_ALLOCATION_RATIO = 0.8
DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT = '420s'
DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT = '900s'
DEFAULT_DRA_MIN_EXECUTOR_RATIO = 0.25


Expand Down Expand Up @@ -254,7 +254,7 @@ def _append_sql_shuffle_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str,
return spark_opts


def _get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
if (
'spark.dynamicAllocation.enabled' not in spark_opts or
str(spark_opts['spark.dynamicAllocation.enabled']) != 'true'
Expand All @@ -271,6 +271,13 @@ def _get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]:
spark_opts, 'spark.dynamicAllocation.cachedExecutorIdleTimeout',
str(DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT),
)
log.warning(
f'\nSetting spark.dynamicAllocation.cachedExecutorIdleTimeout as {DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT}. '
f'Executor with cached data block will be released if it has been idle for this duration. '
f'If you wish to change the value of cachedExecutorIdleTimeout, please provide the exact value of '
f'spark.dynamicAllocation.cachedExecutorIdleTimeout in --spark-args. If your job is performing bad because '
f'the cached data was lost, please consider increasing this value.\n',
)

if 'spark.dynamicAllocation.minExecutors' not in spark_opts:
# the ratio of total executors to be used as minExecutors
Expand Down Expand Up @@ -809,7 +816,7 @@ def get_spark_conf(
raise ValueError('Unknown resource_manager, should be either [mesos,kubernetes]')

# configure dynamic resource allocation configs
spark_conf = _get_dra_configs(spark_conf)
spark_conf = get_dra_configs(spark_conf)

# configure spark_event_log
spark_conf = _append_event_log_conf(spark_conf, *aws_creds)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

setup(
name='service-configuration-lib',
version='2.10.7',
version='2.10.8',
provides=['service_configuration_lib'],
description='Start, stop, and inspect Yelp SOA services',
url='https://github.com/Yelp/service_configuration_lib',
Expand Down
8 changes: 4 additions & 4 deletions tests/spark_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def test_adjust_spark_requested_resources_error(
'spark.dynamicAllocation.enabled': 'true',
'spark.dynamicAllocation.shuffleTracking.enabled': 'true',
'spark.dynamicAllocation.executorAllocationRatio': '0.8',
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s',
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '900s',
'spark.dynamicAllocation.minExecutors': '0',
'spark.dynamicAllocation.maxExecutors': '2',
'spark.executor.instances': '0',
Expand All @@ -495,7 +495,7 @@ def test_adjust_spark_requested_resources_error(
'spark.dynamicAllocation.initialExecutors': '128',
'spark.dynamicAllocation.shuffleTracking.enabled': 'true',
'spark.dynamicAllocation.executorAllocationRatio': '0.8',
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s',
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '900s',
'spark.executor.instances': '128',
},
),
Expand All @@ -510,7 +510,7 @@ def test_adjust_spark_requested_resources_error(
'spark.dynamicAllocation.minExecutors': '205',
'spark.dynamicAllocation.shuffleTracking.enabled': 'true',
'spark.dynamicAllocation.executorAllocationRatio': '0.8',
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s',
'spark.dynamicAllocation.cachedExecutorIdleTimeout': '900s',
'spark.executor.instances': '205',
},
),
Expand Down Expand Up @@ -541,7 +541,7 @@ def test_get_dra_configs(
user_spark_opts,
expected_output,
):
output = spark_config._get_dra_configs(user_spark_opts)
output = spark_config.get_dra_configs(user_spark_opts)
for key in expected_output.keys():
assert output[key] == expected_output[key], f'wrong value for {key}'

Expand Down

0 comments on commit 672798b

Please sign in to comment.