diff --git a/service_configuration_lib/spark_config.py b/service_configuration_lib/spark_config.py index 14f91c2..40f0419 100644 --- a/service_configuration_lib/spark_config.py +++ b/service_configuration_lib/spark_config.py @@ -41,7 +41,7 @@ DEFAULT_CLUSTERMAN_OBSERVED_SCALING_TIME = 15 # minutes DEFAULT_SQL_SHUFFLE_PARTITIONS = 128 DEFAULT_DRA_EXECUTOR_ALLOCATION_RATIO = 0.8 -DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT = '420s' +DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT = '900s' DEFAULT_DRA_MIN_EXECUTOR_RATIO = 0.25 @@ -254,7 +254,7 @@ def _append_sql_shuffle_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str, return spark_opts -def _get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]: +def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]: if ( 'spark.dynamicAllocation.enabled' not in spark_opts or str(spark_opts['spark.dynamicAllocation.enabled']) != 'true' @@ -271,6 +271,13 @@ def _get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]: spark_opts, 'spark.dynamicAllocation.cachedExecutorIdleTimeout', str(DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT), ) + log.warning( + f'\nSetting spark.dynamicAllocation.cachedExecutorIdleTimeout as {DEFAULT_DRA_CACHED_EXECUTOR_IDLE_TIMEOUT}. ' + f'Executor with cached data block will be released if it has been idle for this duration. ' + f'If you wish to change the value of cachedExecutorIdleTimeout, please provide the exact value of ' + f'spark.dynamicAllocation.cachedExecutorIdleTimeout in --spark-args. If your job is performing bad because ' + f'the cached data was lost, please consider increasing this value.\n', + ) if 'spark.dynamicAllocation.minExecutors' not in spark_opts: # the ratio of total executors to be used as minExecutors @@ -809,7 +816,7 @@ def get_spark_conf( raise ValueError('Unknown resource_manager, should be either [mesos,kubernetes]') # configure dynamic resource allocation configs - spark_conf = _get_dra_configs(spark_conf) + spark_conf = get_dra_configs(spark_conf) # configure spark_event_log spark_conf = _append_event_log_conf(spark_conf, *aws_creds) diff --git a/setup.py b/setup.py index e7a9290..d064fa6 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ setup( name='service-configuration-lib', - version='2.10.7', + version='2.10.8', provides=['service_configuration_lib'], description='Start, stop, and inspect Yelp SOA services', url='https://github.com/Yelp/service_configuration_lib', diff --git a/tests/spark_config_test.py b/tests/spark_config_test.py index c80e00d..bd55168 100644 --- a/tests/spark_config_test.py +++ b/tests/spark_config_test.py @@ -475,7 +475,7 @@ def test_adjust_spark_requested_resources_error( 'spark.dynamicAllocation.enabled': 'true', 'spark.dynamicAllocation.shuffleTracking.enabled': 'true', 'spark.dynamicAllocation.executorAllocationRatio': '0.8', - 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s', + 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '900s', 'spark.dynamicAllocation.minExecutors': '0', 'spark.dynamicAllocation.maxExecutors': '2', 'spark.executor.instances': '0', @@ -495,7 +495,7 @@ def test_adjust_spark_requested_resources_error( 'spark.dynamicAllocation.initialExecutors': '128', 'spark.dynamicAllocation.shuffleTracking.enabled': 'true', 'spark.dynamicAllocation.executorAllocationRatio': '0.8', - 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s', + 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '900s', 'spark.executor.instances': '128', }, ), @@ -510,7 +510,7 @@ def test_adjust_spark_requested_resources_error( 'spark.dynamicAllocation.minExecutors': '205', 'spark.dynamicAllocation.shuffleTracking.enabled': 'true', 'spark.dynamicAllocation.executorAllocationRatio': '0.8', - 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '420s', + 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '900s', 'spark.executor.instances': '205', }, ), @@ -541,7 +541,7 @@ def test_get_dra_configs( user_spark_opts, expected_output, ): - output = spark_config._get_dra_configs(user_spark_opts) + output = spark_config.get_dra_configs(user_spark_opts) for key in expected_output.keys(): assert output[key] == expected_output[key], f'wrong value for {key}'