diff --git a/service_configuration_lib/spark_config.py b/service_configuration_lib/spark_config.py index b2021d5..3048bbc 100644 --- a/service_configuration_lib/spark_config.py +++ b/service_configuration_lib/spark_config.py @@ -35,7 +35,6 @@ CLUSTERMAN_METRICS_YAML_FILE_PATH = '/nail/srv/configs/clusterman_metrics.yaml' CLUSTERMAN_YAML_FILE_PATH = '/nail/srv/configs/clusterman.yaml' - NON_CONFIGURABLE_SPARK_OPTS = { 'spark.master', 'spark.ui.port', @@ -65,6 +64,7 @@ 'spark.kubernetes.executor.label.paasta.yelp.com/instance', 'spark.kubernetes.executor.label.paasta.yelp.com/cluster', } + K8S_AUTH_FOLDER = '/etc/pki/spark' K8S_BASE_VOLUMES: List[Dict[str, str]] = [ {'containerPath': K8S_AUTH_FOLDER, 'hostPath': K8S_AUTH_FOLDER, 'mode': 'RO'}, @@ -78,11 +78,6 @@ log = logging.Logger(__name__) log.setLevel(logging.INFO) -( - spark_srv_conf, spark_constants, default_spark_srv_conf, - mandatory_default_spark_srv_conf, spark_costs, -) = load_spark_srv_conf() - class UnsupportedClusterManagerException(Exception): @@ -228,228 +223,11 @@ def _append_spark_config(spark_opts: Dict[str, str], config_name: str, config_va return spark_opts -def _append_sql_partitions_conf(spark_opts: Dict[str, str]) -> Dict[str, str]: - if 'spark.sql.shuffle.partitions' not in spark_opts: - num_partitions = 3 * ( - int(spark_opts.get('spark.cores.max', 0)) or - (int(spark_opts.get('spark.executor.instances', 0)) * - int(spark_opts.get('spark.executor.cores', default_spark_srv_conf['spark.executor.cores']))) - ) - - if ( - 'spark.dynamicAllocation.enabled' in spark_opts and - str(spark_opts['spark.dynamicAllocation.enabled']) == 'true' and - 'spark.dynamicAllocation.maxExecutors' in spark_opts and - str(spark_opts['spark.dynamicAllocation.maxExecutors']) != 'infinity' - ): - - num_partitions_dra = 3 * ( - int(spark_opts.get('spark.dynamicAllocation.maxExecutors', 0)) * - int(spark_opts.get('spark.executor.cores', default_spark_srv_conf['spark.executor.cores'])) - ) - num_partitions = max(num_partitions, num_partitions_dra) - - num_partitions = num_partitions or default_spark_srv_conf['spark.sql.shuffle.partitions'] - _append_spark_config(spark_opts, 'spark.sql.shuffle.partitions', str(num_partitions)) - else: - num_partitions = int(spark_opts['spark.sql.shuffle.partitions']) - _append_spark_config(spark_opts, 'spark.sql.files.minPartitionNum', str(num_partitions)) - _append_spark_config(spark_opts, 'spark.default.parallelism', str(num_partitions)) - - return spark_opts - - def _is_jupyterhub_job(spark_app_name: str) -> bool: # TODO: add regex to better match Jupyterhub Spark session app name return 'jupyterhub' in spark_app_name -def get_dra_configs(spark_opts: Dict[str, str]) -> Dict[str, str]: - # don't enable DRA if it is explicitly disabled - if ( - 'spark.dynamicAllocation.enabled' in spark_opts and - str(spark_opts['spark.dynamicAllocation.enabled']) != 'true' - ): - return spark_opts - - spark_app_name = spark_opts.get('spark.app.name', '') - - log.warning( - TextColors.yellow( - '\nSpark Dynamic Resource Allocation (DRA) enabled for this batch. More info: y/spark-dra.\n', - ), - ) - - # set defaults if not provided already - _append_spark_config(spark_opts, 'spark.dynamicAllocation.enabled', 'true') - _append_spark_config(spark_opts, 'spark.dynamicAllocation.shuffleTracking.enabled', 'true') - _append_spark_config( - spark_opts, 'spark.dynamicAllocation.executorAllocationRatio', - str(default_spark_srv_conf['spark.dynamicAllocation.executorAllocationRatio']), - ) - cached_executor_idle_timeout = default_spark_srv_conf['spark.dynamicAllocation.cachedExecutorIdleTimeout'] - if 'spark.dynamicAllocation.cachedExecutorIdleTimeout' not in spark_opts: - if _is_jupyterhub_job(spark_app_name): - # increase cachedExecutorIdleTimeout by 15 minutes in case of Jupyterhub - cached_executor_idle_timeout = str(int(cached_executor_idle_timeout[:-1]) + 900) + 's' - log.warning( - f'\nSetting {TextColors.yellow("spark.dynamicAllocation.cachedExecutorIdleTimeout")} as ' - f'{cached_executor_idle_timeout}. Executor with cached data block will be released ' - f'if it has been idle for this duration. If you wish to change the value of cachedExecutorIdleTimeout, ' - f'please provide the exact value of spark.dynamicAllocation.cachedExecutorIdleTimeout ' - f'in your spark args. If your job is performing bad because the cached data was lost, ' - f'please consider increasing this value.\n', - ) - _append_spark_config( - spark_opts, 'spark.dynamicAllocation.cachedExecutorIdleTimeout', - cached_executor_idle_timeout, - ) - - min_ratio_executors = None - default_dra_min_executor_ratio = default_spark_srv_conf['spark.yelp.dra.minExecutorRatio'] - if 'spark.dynamicAllocation.minExecutors' not in spark_opts: - # the ratio of total executors to be used as minExecutors - min_executor_ratio = spark_opts.get('spark.yelp.dra.minExecutorRatio', default_dra_min_executor_ratio) - # set minExecutors default as a ratio of spark.executor.instances - num_instances = int( - spark_opts.get( - 'spark.executor.instances', - default_spark_srv_conf['spark.executor.instances'], - ), - ) - min_executors = int(num_instances * float(min_executor_ratio)) - # minExecutors should not be more than initialExecutors - if 'spark.dynamicAllocation.initialExecutors' in spark_opts: - min_executors = min(min_executors, int(spark_opts['spark.dynamicAllocation.initialExecutors'])) - # minExecutors should not be more than maxExecutors - if 'spark.dynamicAllocation.maxExecutors' in spark_opts: - min_executors = min( - min_executors, int(int(spark_opts['spark.dynamicAllocation.maxExecutors']) * - float(min_executor_ratio)), - ) - - min_ratio_executors = min_executors - - warn_msg = f'\nSetting {TextColors.yellow("spark.dynamicAllocation.minExecutors")} as' - - # set minExecutors equal to 0 for Jupyter Spark sessions - if _is_jupyterhub_job(spark_app_name): - min_executors = 0 - warn_msg = ( - f'Looks like you are launching Spark session from a Jupyter notebook. ' - f'{warn_msg} {min_executors} to save spark costs when any spark action is not running' - ) - else: - warn_msg = f'{warn_msg} {min_executors}' - - spark_opts['spark.dynamicAllocation.minExecutors'] = str(min_executors) - log.warning( - f'\n{warn_msg}. If you wish to change the value of minimum executors, please provide ' - f'the exact value of spark.dynamicAllocation.minExecutors in your spark args\n', - ) - - if not _is_jupyterhub_job(spark_app_name) and 'spark.yelp.dra.minExecutorRatio' not in spark_opts: - log.debug( - f'\nspark.yelp.dra.minExecutorRatio not provided. This specifies the ratio of total executors ' - f'to be used as minimum executors for Dynamic Resource Allocation. More info: y/spark-dra. Using ' - f'default ratio: {default_dra_min_executor_ratio}. If you wish to change this value, please provide ' - f'the desired spark.yelp.dra.minExecutorRatio in your spark args\n', - ) - - if 'spark.dynamicAllocation.maxExecutors' not in spark_opts: - # set maxExecutors default equal to spark.executor.instances - max_executors = int( - spark_opts.get( - 'spark.executor.instances', - default_spark_srv_conf['spark.executor.instances'], - ), - ) - # maxExecutors should not be less than initialExecutors - if 'spark.dynamicAllocation.initialExecutors' in spark_opts: - max_executors = max(max_executors, int(spark_opts['spark.dynamicAllocation.initialExecutors'])) - - spark_opts['spark.dynamicAllocation.maxExecutors'] = str(max_executors) - log.warning( - f'\nSetting {TextColors.yellow("spark.dynamicAllocation.maxExecutors")} as {max_executors}. ' - f'If you wish to change the value of maximum executors, please provide the exact value of ' - f'spark.dynamicAllocation.maxExecutors in your spark args\n', - ) - - # TODO: add regex to better match Jupyterhub Spark session app name - if 'jupyterhub' in spark_app_name and 'spark.dynamicAllocation.initialExecutors' not in spark_opts: - if min_ratio_executors is not None: - # set initialExecutors default equal to minimum executors calculated above using - # 'spark.yelp.dra.minExecutorRatio' and `default_dra_min_executor_ratio` for Jupyter Spark sessions - initial_executors = min_ratio_executors - else: - # otherwise set initial executors equal to minimum executors - initial_executors = int(spark_opts['spark.dynamicAllocation.minExecutors']) - - spark_opts['spark.dynamicAllocation.initialExecutors'] = str(initial_executors) - log.warning( - f'\nSetting {TextColors.yellow("spark.dynamicAllocation.initialExecutors")} as {initial_executors}. ' - f'If you wish to change the value of initial executors, please provide the exact value of ' - f'spark.dynamicAllocation.initialExecutors in your spark args\n', - ) - - spark_opts['spark.executor.instances'] = spark_opts['spark.dynamicAllocation.minExecutors'] - return spark_opts - - -def _append_spark_prometheus_conf(spark_opts: Dict[str, str]) -> Dict[str, str]: - spark_opts['spark.ui.prometheus.enabled'] = 'true' - spark_opts['spark.metrics.conf.*.sink.prometheusServlet.class'] = 'org.apache.spark.metrics.sink.PrometheusServlet' - spark_opts['spark.metrics.conf.*.sink.prometheusServlet.path'] = '/metrics/prometheus' - return spark_opts - - -def _append_event_log_conf( - spark_opts: Dict[str, str], - access_key: Optional[str], - secret_key: Optional[str], - session_token: Optional[str] = None, -) -> Dict[str, str]: - enabled = spark_opts.setdefault('spark.eventLog.enabled', 'true').lower() - if enabled != 'true': - # user configured to disable log, not continue - return spark_opts - - event_log_dir = spark_opts.get('spark.eventLog.dir') - if event_log_dir is not None: - # we don't want to overwrite user's settings - return spark_opts - - if len(spark_srv_conf.items()) == 0: - log.warning('spark_srv_conf is empty, disable event log') - return spark_opts - - try: - account_id = ( - boto3.client( - 'sts', - aws_access_key_id=access_key, - aws_secret_access_key=secret_key, - aws_session_token=session_token, - ) - .get_caller_identity() - .get('Account') - ) - except Exception as e: - log.warning('Failed to identify account ID, error: {}'.format(str(e))) - spark_opts['spark.eventLog.enabled'] = 'false' - return spark_opts - - for conf in spark_srv_conf.get('environments', {}).values(): - if account_id == conf['account_id']: - spark_opts['spark.eventLog.enabled'] = 'true' - spark_opts['spark.eventLog.dir'] = conf['default_event_log_dir'] - return spark_opts - - log.warning(f'Disable event log because No preset event log dir for account: {account_id}') - spark_opts['spark.eventLog.enabled'] = 'false' - return spark_opts - - def _append_aws_credentials_conf( spark_opts: Dict[str, str], access_key: Optional[str], @@ -468,312 +246,6 @@ def _append_aws_credentials_conf( return spark_opts -def compute_executor_instances_k8s(user_spark_opts: Dict[str, str]) -> int: - executor_cores = int( - user_spark_opts.get( - 'spark.executor.cores', - default_spark_srv_conf['spark.executor.cores'], - ), - ) - - if 'spark.executor.instances' in user_spark_opts: - executor_instances = int(user_spark_opts['spark.executor.instances']) - elif 'spark.cores.max' in user_spark_opts: - # spark.cores.max provided, calculate based on (max cores // per-executor cores) - executor_instances = (int(user_spark_opts['spark.cores.max']) // executor_cores) - else: - # spark.executor.instances and spark.cores.max not provided, the executor instances should at least - # be equal to `default_executor_instances`. - executor_instances = max( - (default_spark_srv_conf['spark.executor.instances'] * - default_spark_srv_conf['spark.executor.cores']) // executor_cores, - default_spark_srv_conf['spark.executor.instances'], - ) - - # Deprecation message - if 'spark.cores.max' in user_spark_opts: - log.warning( - f'spark.cores.max is DEPRECATED. Replace with ' - f'spark.executor.instances={executor_instances} in --spark-args and in your service code ' - f'as "spark.executor.instances * spark.executor.cores" if used.\n', - ) - - return executor_instances - - -def _cap_executor_resources( - executor_cores: int, - executor_memory: str, - memory_mb: int, -) -> Tuple[int, str]: - max_cores = spark_constants.get('resource_configs', {}).get('max', {})['cpu'] - max_memory_gb = spark_constants.get('resource_configs', {}).get('max', {})['mem'] - - warning_title = 'Capped Executor Resources based on maximun available aws nodes' - warning_title_printed = False - - if memory_mb > max_memory_gb * 1024: - executor_memory = f'{max_memory_gb}g' - log.warning(warning_title) - log.warning( - f' - spark.executor.memory: {int(memory_mb / 1024):3}g → {executor_memory}', - ) - warning_title_printed = True - - if executor_cores > max_cores: - if not warning_title_printed: - log.warning(warning_title) - log.warning( - f' - spark.executor.cores: {executor_cores:3}c → {max_cores}c\n', - ) - executor_cores = max_cores - - return executor_cores, executor_memory - - -def _recalculate_executor_resources( - user_spark_opts: Dict[str, str], - force_spark_resource_configs: bool, - ratio_adj_thresh: int, - pool: str, -) -> Dict[str, str]: - executor_cores = int( - user_spark_opts.get( - 'spark.executor.cores', - default_spark_srv_conf['spark.executor.cores'], - ), - ) - executor_memory = user_spark_opts.get( - 'spark.executor.memory', - f'{default_spark_srv_conf["spark.executor.memory"]}g', - ) - executor_instances = int( - user_spark_opts.get( - 'spark.executor.instances', - default_spark_srv_conf['spark.executor.instances'], - ), - ) - task_cpus = int( - user_spark_opts.get( - 'spark.task.cpus', - default_spark_srv_conf['spark.task.cpus'], - ), - ) - - memory_mb = parse_memory_string(executor_memory) - memory_gb = math.ceil(memory_mb / 1024) - - def _calculate_resources( - cpu: int, - memory: int, - instances: int, - task_cpus: int, - target_memory: int, - ratio_adj_thresh: int, - ) -> Tuple[int, str, int, int]: - """ - Calculate resource needed based on memory size and recommended mem:core ratio (7:1). - - Parameters: - memory: integer values in GB. - - Returns: - A tuple of (new_cpu, new_memory, new_instances, task_cpus). - """ - # For multi-step release - if memory > ratio_adj_thresh: - return cpu, f'{memory}g', instances, task_cpus - - target_mem_cpu_ratio = spark_constants['target_mem_cpu_ratio'] - - new_cpu: int - new_memory: int - new_instances = (instances * memory) // target_memory - if new_instances > 0: - new_cpu = int(target_memory / target_mem_cpu_ratio) - new_memory = target_memory - else: - new_instances = 1 - new_cpu = max(memory // target_mem_cpu_ratio, 1) - new_memory = new_cpu * target_mem_cpu_ratio - - if cpu != new_cpu or memory != new_memory or instances != new_instances: - log.warning( - f'Adjust Executor Resources based on recommended mem:core:: 7:1 and Bucket: ' - f'{new_memory}g, {new_cpu}cores to better fit aws nodes\n' - f' - spark.executor.cores: {cpu:3}c → {new_cpu}c\n' - f' - spark.executor.memory: {memory:3}g → {new_memory}g\n' - f' - spark.executor.instances: {instances:3}x → {new_instances}x\n' - 'Check y/spark-metrics to compare how your job performance compared to previous runs.\n' - 'Feel free to adjust to spark resource configs in yelpsoa-configs with above newly adjusted values.\n', - ) - - if new_cpu < task_cpus: - log.warning( - f'Given spark.task.cpus is {task_cpus}, ' - f'=> adjusted to {new_cpu} to keep it within the limits of adjust spark.executor.cores.\n', - ) - task_cpus = new_cpu - return new_cpu, f'{new_memory}g', new_instances, task_cpus - - # Constants - recommended_memory_gb = spark_constants.get('resource_configs', {}).get('recommended', {})['mem'] - medium_cores = spark_constants.get('resource_configs', {}).get('medium', {})['cpu'] - medium_memory_mb = spark_constants.get('resource_configs', {}).get('medium', {})['mem'] - max_cores = spark_constants.get('resource_configs', {}).get('max', {})['cpu'] - max_memory_gb = spark_constants.get('resource_configs', {}).get('max', {})['mem'] - - if pool not in ['batch', 'stable_batch']: - log.warning( - f'We are not internally adjusting any spark resources for given pool {pool}. ' - 'Please ensure that given resource requests are optimal.', - ) - elif memory_gb > max_memory_gb or executor_cores > max_cores: - executor_cores, executor_memory = _cap_executor_resources(executor_cores, executor_memory, memory_mb) - elif force_spark_resource_configs: - log.warning( - '--force-spark-resource-configs is set to true: ' - 'this can result in non-optimal bin-packing of executors on aws nodes or ' - 'can lead to wastage the resources. ' - "Please use this flag only if you have tested that standard memory/cpu configs won't work for your job.\n" - 'Let us know at #spark if you think, your use-case needs to be standardized.\n', - ) - elif memory_gb >= medium_memory_mb or executor_cores > medium_cores: - (executor_cores, executor_memory, executor_instances, task_cpus) = _calculate_resources( - executor_cores, - memory_gb, - executor_instances, - task_cpus, - medium_memory_mb, - ratio_adj_thresh, - ) - else: - (executor_cores, executor_memory, executor_instances, task_cpus) = _calculate_resources( - executor_cores, - memory_gb, - executor_instances, - task_cpus, - recommended_memory_gb, - ratio_adj_thresh, - ) - - user_spark_opts.update({ - 'spark.executor.cores': str(executor_cores), - 'spark.kubernetes.executor.limit.cores': str(executor_cores), - 'spark.executor.memory': str(executor_memory), - 'spark.executor.instances': str(executor_instances), - 'spark.task.cpus': str(task_cpus), - }) - if 'spark.cores.max' in user_spark_opts: - user_spark_opts['spark.cores.max'] = str(executor_instances * executor_cores) - return user_spark_opts - - -def _adjust_spark_requested_resources( - user_spark_opts: Dict[str, str], - cluster_manager: str, - pool: str, - force_spark_resource_configs: bool = False, - ratio_adj_thresh: int = spark_constants['adjust_executor_res_ratio_thresh'], -) -> Dict[str, str]: - executor_cores = int( - user_spark_opts.setdefault( - 'spark.executor.cores', - str(default_spark_srv_conf['spark.executor.cores']), - ), - ) - if cluster_manager == 'kubernetes': - executor_instances = compute_executor_instances_k8s(user_spark_opts) - user_spark_opts.setdefault('spark.executor.instances', str(executor_instances)) - max_cores = executor_instances * executor_cores - if ( - 'spark.mesos.executor.memoryOverhead' in user_spark_opts and - 'spark.executor.memoryOverhead' not in user_spark_opts - ): - user_spark_opts['spark.executor.memoryOverhead'] = user_spark_opts['spark.mesos.executor.memoryOverhead'] - user_spark_opts.setdefault('spark.kubernetes.executor.limit.cores', str(executor_cores)) - waiting_time = ( - spark_constants['default_clusterman_observed_scaling_time'] + - executor_instances * spark_constants['default_resources_waiting_time_per_executor'] // 60 - ) - user_spark_opts.setdefault( - 'spark.scheduler.maxRegisteredResourcesWaitingTime', - str(waiting_time) + 'min', - ) - elif cluster_manager == 'local': - executor_instances = int( - user_spark_opts.setdefault( - 'spark.executor.instances', - str(default_spark_srv_conf['spark.executor.instances']), - ), - ) - max_cores = executor_instances * executor_cores - else: - raise UnsupportedClusterManagerException(cluster_manager) - - if max_cores < executor_cores: - raise ValueError(f'Total number of cores {max_cores} is less than per-executor cores {executor_cores}') - - # TODO: replace this with kubernetes specific config - num_gpus = int(user_spark_opts.get('spark.mesos.gpus.max', '0')) - task_cpus = int(user_spark_opts.get('spark.task.cpus', '1')) - # we can skip this step if user is not using gpu or do not configure - # task cpus and executor cores - if num_gpus == 0 or (task_cpus != 1 and executor_cores != 1): - return _recalculate_executor_resources(user_spark_opts, force_spark_resource_configs, ratio_adj_thresh, pool) - - if num_gpus > GPUS_HARD_LIMIT: - raise ValueError( - 'Requested {num_gpus} GPUs, which exceeds hard limit of {GPUS_HARD_LIMIT}', - ) - - with open(GPU_POOLS_YAML_FILE_PATH) as fp: - pool_def = yaml.safe_load(fp).get(pool) - - if pool_def is None: - raise ValueError( - 'Unable to adjust spark.task.cpus and spark.executor.cores because ' - f"pool \"{pool}\" not found in gpu_pools", - ) - - gpus_per_inst = int(pool_def['gpus_per_instance']) - cpus_per_inst = int(pool_def['cpus_per_instance']) - if gpus_per_inst == 0 or cpus_per_inst == 0: - raise ValueError( - 'Unable to adjust spark.task.cpus and spark.executor.cores because ' - f'pool {pool} does not appear to have any GPUs and/or CPUs', - ) - - instances = num_gpus // gpus_per_inst - if (instances * gpus_per_inst) != num_gpus: - raise ValueError( - 'Unable to adjust spark.task.cpus and spark.executor.cores because ' - 'spark.mesos.gpus.max=%i is not a multiple of %i' - % (num_gpus, gpus_per_inst), - ) - - cpus_per_gpu = cpus_per_inst // gpus_per_inst - total_cpus = cpus_per_gpu * num_gpus - num_cpus = int(executor_instances) * int(executor_cores) - if num_cpus != total_cpus: - log.warning( - f'spark.cores.max has been adjusted to {total_cpus}. ' - 'See y/horovod for sizing of GPU pools.', - ) - - user_spark_opts.update({ - # Mesos limitation - need this to access GPUs - 'spark.mesos.containerizer': 'mesos', - # For use by horovod.spark.run(...) in place of num_proc - 'spark.default.parallelism': str(num_gpus), - # we need to adjust the requirements to meet the gpus requriements - 'spark.task.cpus': str(cpus_per_gpu), - 'spark.executor.cores': str(cpus_per_gpu * gpus_per_inst), - 'spark.cores.max': str(total_cpus), - }) - return _recalculate_executor_resources(user_spark_opts, force_spark_resource_configs, ratio_adj_thresh, pool) - - def find_spark_master(paasta_cluster): """Finds the Mesos leader of a PaaSTA cluster. @@ -892,17 +364,6 @@ def stringify_spark_env(spark_env: Mapping[str, str]) -> str: return ' '.join([f'--conf {k}={v}' for k, v in spark_env.items()]) -def _filter_user_spark_opts(user_spark_opts: Mapping[str, str]) -> MutableMapping[str, str]: - non_configurable_opts = set(user_spark_opts.keys()) & set(NON_CONFIGURABLE_SPARK_OPTS) - if non_configurable_opts: - log.warning(f'The following options are configured by Paasta: {non_configurable_opts} instead') - return { - key: value - for key, value in user_spark_opts.items() - if key not in NON_CONFIGURABLE_SPARK_OPTS - } - - def _convert_user_spark_opts_value_to_str(user_spark_opts: Mapping[str, Any]) -> Dict[str, str]: output = {} for key, val in user_spark_opts.items(): @@ -920,189 +381,758 @@ def _convert_user_spark_opts_value_to_str(user_spark_opts: Mapping[str, Any]) -> return output -def update_spark_srv_configs(spark_conf: MutableMapping[str, str]): - additional_conf = {k: v for k, v in mandatory_default_spark_srv_conf.items() if k not in spark_conf} - spark_conf.update(additional_conf) +def _filter_user_spark_opts(user_spark_opts: Mapping[str, str]) -> MutableMapping[str, str]: + non_configurable_opts = set(user_spark_opts.keys()) & set(NON_CONFIGURABLE_SPARK_OPTS) + if non_configurable_opts: + log.warning(f'The following options are configured by Paasta: {non_configurable_opts} instead') + return { + key: value + for key, value in user_spark_opts.items() + if key not in NON_CONFIGURABLE_SPARK_OPTS + } + + +class SparkConfBuilder: + def __init__(self): + self.spark_srv_conf = dict() + self.spark_constants = dict() + self.default_spark_srv_conf = dict() + self.mandatory_default_spark_srv_conf = dict() + self.spark_costs = dict() -def compute_approx_hourly_cost_dollars( - spark_conf: Mapping[str, str], - paasta_cluster: str, - paasta_pool: str, -) -> Tuple[float, float]: - per_executor_cores = int(spark_conf.get( - 'spark.executor.cores', - default_spark_srv_conf['spark.executor.cores'], - )) - max_cores = per_executor_cores * (int(spark_conf.get( - 'spark.executor.instances', - default_spark_srv_conf['spark.executor.instances'], - ))) - min_cores = max_cores - if 'spark.dynamicAllocation.enabled' in spark_conf and spark_conf['spark.dynamicAllocation.enabled'] == 'true': - max_cores = per_executor_cores * (int( - spark_conf.get('spark.dynamicAllocation.maxExecutors', max_cores), - )) - min_cores = per_executor_cores * (int( - spark_conf.get('spark.dynamicAllocation.minExecutors', min_cores), - )) + try: + ( + self.spark_srv_conf, self.spark_constants, self.default_spark_srv_conf, + self.mandatory_default_spark_srv_conf, self.spark_costs, + ) = load_spark_srv_conf() + except Exception as e: + log.error(f'Failed to load Spark srv configs: {e}') + + def _append_spark_prometheus_conf(self, spark_opts: Dict[str, str]) -> Dict[str, str]: + spark_opts['spark.ui.prometheus.enabled'] = 'true' + spark_opts[ + 'spark.metrics.conf.*.sink.prometheusServlet.class' + ] = 'org.apache.spark.metrics.sink.PrometheusServlet' + spark_opts['spark.metrics.conf.*.sink.prometheusServlet.path'] = '/metrics/prometheus' + return spark_opts + + def get_dra_configs(self, spark_opts: Dict[str, str]) -> Dict[str, str]: + # don't enable DRA if it is explicitly disabled + if ( + 'spark.dynamicAllocation.enabled' in spark_opts and + str(spark_opts['spark.dynamicAllocation.enabled']) != 'true' + ): + return spark_opts - cost_factor = spark_costs.get(paasta_cluster, dict())[paasta_pool] + spark_app_name = spark_opts.get('spark.app.name', '') - min_dollars = round(min_cores * cost_factor, 5) - max_dollars = round(max_cores * cost_factor, 5) - if max_dollars * 24 > spark_constants['high_cost_threshold_daily']: log.warning( - TextColors.red( - TextColors.bold( - '\n!!!!! HIGH COST ALERT !!!!!', + TextColors.yellow( + '\nSpark Dynamic Resource Allocation (DRA) enabled for this batch. More info: y/spark-dra.\n', + ), + ) + + # set defaults if not provided already + _append_spark_config(spark_opts, 'spark.dynamicAllocation.enabled', 'true') + _append_spark_config(spark_opts, 'spark.dynamicAllocation.shuffleTracking.enabled', 'true') + _append_spark_config( + spark_opts, 'spark.dynamicAllocation.executorAllocationRatio', + str(self.default_spark_srv_conf['spark.dynamicAllocation.executorAllocationRatio']), + ) + cached_executor_idle_timeout = self.default_spark_srv_conf['spark.dynamicAllocation.cachedExecutorIdleTimeout'] + if 'spark.dynamicAllocation.cachedExecutorIdleTimeout' not in spark_opts: + if _is_jupyterhub_job(spark_app_name): + # increase cachedExecutorIdleTimeout by 15 minutes in case of Jupyterhub + cached_executor_idle_timeout = str(int(cached_executor_idle_timeout[:-1]) + 900) + 's' + log.warning( + f'\nSetting {TextColors.yellow("spark.dynamicAllocation.cachedExecutorIdleTimeout")} as ' + f'{cached_executor_idle_timeout}. Executor with cached data block will be released ' + f'if it has been idle for this duration. If you wish to change the value of ' + f'cachedExecutorIdleTimeout, please provide the exact value of ' + f'spark.dynamicAllocation.cachedExecutorIdleTimeout in your spark args. If your job is performing ' + f'bad because the cached data was lost, please consider increasing this value.\n', + ) + _append_spark_config( + spark_opts, 'spark.dynamicAllocation.cachedExecutorIdleTimeout', + cached_executor_idle_timeout, + ) + + min_ratio_executors = None + default_dra_min_executor_ratio = self.default_spark_srv_conf['spark.yelp.dra.minExecutorRatio'] + if 'spark.dynamicAllocation.minExecutors' not in spark_opts: + # the ratio of total executors to be used as minExecutors + min_executor_ratio = spark_opts.get('spark.yelp.dra.minExecutorRatio', default_dra_min_executor_ratio) + # set minExecutors default as a ratio of spark.executor.instances + num_instances = int( + spark_opts.get( + 'spark.executor.instances', + self.default_spark_srv_conf['spark.executor.instances'], + ), + ) + min_executors = int(num_instances * float(min_executor_ratio)) + # minExecutors should not be more than initialExecutors + if 'spark.dynamicAllocation.initialExecutors' in spark_opts: + min_executors = min(min_executors, int(spark_opts['spark.dynamicAllocation.initialExecutors'])) + # minExecutors should not be more than maxExecutors + if 'spark.dynamicAllocation.maxExecutors' in spark_opts: + min_executors = min( + min_executors, int(int(spark_opts['spark.dynamicAllocation.maxExecutors']) * + float(min_executor_ratio)), + ) + + min_ratio_executors = min_executors + + warn_msg = f'\nSetting {TextColors.yellow("spark.dynamicAllocation.minExecutors")} as' + + # set minExecutors equal to 0 for Jupyter Spark sessions + if _is_jupyterhub_job(spark_app_name): + min_executors = 0 + warn_msg = ( + f'Looks like you are launching Spark session from a Jupyter notebook. ' + f'{warn_msg} {min_executors} to save spark costs when any spark action is not running' + ) + else: + warn_msg = f'{warn_msg} {min_executors}' + + spark_opts['spark.dynamicAllocation.minExecutors'] = str(min_executors) + log.warning( + f'\n{warn_msg}. If you wish to change the value of minimum executors, please provide ' + f'the exact value of spark.dynamicAllocation.minExecutors in your spark args\n', + ) + + if not _is_jupyterhub_job(spark_app_name) and 'spark.yelp.dra.minExecutorRatio' not in spark_opts: + log.debug( + f'\nspark.yelp.dra.minExecutorRatio not provided. This specifies the ratio of total executors ' + f'to be used as minimum executors for Dynamic Resource Allocation. More info: y/spark-dra. ' + f'Using default ratio: {default_dra_min_executor_ratio}. If you wish to change this value, ' + f'please provide the desired spark.yelp.dra.minExecutorRatio in your spark args\n', + ) + + if 'spark.dynamicAllocation.maxExecutors' not in spark_opts: + # set maxExecutors default equal to spark.executor.instances + max_executors = int( + spark_opts.get( + 'spark.executor.instances', + self.default_spark_srv_conf['spark.executor.instances'], ), + ) + # maxExecutors should not be less than initialExecutors + if 'spark.dynamicAllocation.initialExecutors' in spark_opts: + max_executors = max(max_executors, int(spark_opts['spark.dynamicAllocation.initialExecutors'])) + + spark_opts['spark.dynamicAllocation.maxExecutors'] = str(max_executors) + log.warning( + f'\nSetting {TextColors.yellow("spark.dynamicAllocation.maxExecutors")} as {max_executors}. ' + f'If you wish to change the value of maximum executors, please provide the exact value of ' + f'spark.dynamicAllocation.maxExecutors in your spark args\n', + ) + + # TODO: add regex to better match Jupyterhub Spark session app name + if 'jupyterhub' in spark_app_name and 'spark.dynamicAllocation.initialExecutors' not in spark_opts: + if min_ratio_executors is not None: + # set initialExecutors default equal to minimum executors calculated above using + # 'spark.yelp.dra.minExecutorRatio' and `default_dra_min_executor_ratio` for Jupyter Spark sessions + initial_executors = min_ratio_executors + else: + # otherwise set initial executors equal to minimum executors + initial_executors = int(spark_opts['spark.dynamicAllocation.minExecutors']) + + spark_opts['spark.dynamicAllocation.initialExecutors'] = str(initial_executors) + log.warning( + f'\nSetting {TextColors.yellow("spark.dynamicAllocation.initialExecutors")} as {initial_executors}. ' + f'If you wish to change the value of initial executors, please provide the exact value of ' + f'spark.dynamicAllocation.initialExecutors in your spark args\n', + ) + + spark_opts['spark.executor.instances'] = spark_opts['spark.dynamicAllocation.minExecutors'] + return spark_opts + + def _cap_executor_resources( + self, + executor_cores: int, + executor_memory: str, + memory_mb: int, + ) -> Tuple[int, str]: + max_cores = self.spark_constants.get('resource_configs', {}).get('max', {})['cpu'] + max_memory_gb = self.spark_constants.get('resource_configs', {}).get('max', {})['mem'] + + warning_title = 'Capped Executor Resources based on maximun available aws nodes' + warning_title_printed = False + + if memory_mb > max_memory_gb * 1024: + executor_memory = f'{max_memory_gb}g' + log.warning(warning_title) + log.warning( + f' - spark.executor.memory: {int(memory_mb / 1024):3}g → {executor_memory}', + ) + warning_title_printed = True + + if executor_cores > max_cores: + if not warning_title_printed: + log.warning(warning_title) + log.warning( + f' - spark.executor.cores: {executor_cores:3}c → {max_cores}c\n', + ) + executor_cores = max_cores + + return executor_cores, executor_memory + + def compute_executor_instances_k8s(self, user_spark_opts: Dict[str, str]) -> int: + executor_cores = int( + user_spark_opts.get( + 'spark.executor.cores', + self.default_spark_srv_conf['spark.executor.cores'], ), ) - log.warning( - TextColors.magenta( - TextColors.bold( - f'\nExpected {"maximum" if min_dollars != max_dollars else ""} cost based on requested resources: ' - f'${str(max_dollars)} every hour and ${str(max_dollars * 24)} in a day.' - f'\nPlease monitor y/spark-metrics for memory and cpu usage to tune requested executor count' - f' config spark.executor.instances.\nFollow y/write-spark-job for optimization tips.\n', + + if 'spark.executor.instances' in user_spark_opts: + executor_instances = int(user_spark_opts['spark.executor.instances']) + elif 'spark.cores.max' in user_spark_opts: + # spark.cores.max provided, calculate based on (max cores // per-executor cores) + executor_instances = (int(user_spark_opts['spark.cores.max']) // executor_cores) + else: + # spark.executor.instances and spark.cores.max not provided, the executor instances should at least + # be equal to `default_executor_instances`. + executor_instances = max( + (self.default_spark_srv_conf['spark.executor.instances'] * + self.default_spark_srv_conf['spark.executor.cores']) // executor_cores, + self.default_spark_srv_conf['spark.executor.instances'], + ) + + # Deprecation message + if 'spark.cores.max' in user_spark_opts: + log.warning( + f'spark.cores.max is DEPRECATED. Replace with ' + f'spark.executor.instances={executor_instances} in --spark-args and in your service code ' + f'as "spark.executor.instances * spark.executor.cores" if used.\n', + ) + + return executor_instances + + def _recalculate_executor_resources( + self, + user_spark_opts: Dict[str, str], + force_spark_resource_configs: bool, + ratio_adj_thresh: int, + pool: str, + ) -> Dict[str, str]: + executor_cores = int( + user_spark_opts.get( + 'spark.executor.cores', + self.default_spark_srv_conf['spark.executor.cores'], ), - ), - ) - return min_dollars, max_dollars + ) + executor_memory = user_spark_opts.get( + 'spark.executor.memory', + f'{self.default_spark_srv_conf["spark.executor.memory"]}g', + ) + executor_instances = int( + user_spark_opts.get( + 'spark.executor.instances', + self.default_spark_srv_conf['spark.executor.instances'], + ), + ) + task_cpus = int( + user_spark_opts.get( + 'spark.task.cpus', + self.default_spark_srv_conf['spark.task.cpus'], + ), + ) + memory_mb = parse_memory_string(executor_memory) + memory_gb = math.ceil(memory_mb / 1024) + + def _calculate_resources( + cpu: int, + memory: int, + instances: int, + task_cpus: int, + target_memory: int, + ratio_adj_thresh: int, + ) -> Tuple[int, str, int, int]: + """ + Calculate resource needed based on memory size and recommended mem:core ratio (7:1). + + Parameters: + memory: integer values in GB. + + Returns: + A tuple of (new_cpu, new_memory, new_instances, task_cpus). + """ + # For multi-step release + if memory > ratio_adj_thresh: + return cpu, f'{memory}g', instances, task_cpus + + target_mem_cpu_ratio = self.spark_constants['target_mem_cpu_ratio'] + + new_cpu: int + new_memory: int + new_instances = (instances * memory) // target_memory + if new_instances > 0: + new_cpu = int(target_memory / target_mem_cpu_ratio) + new_memory = target_memory + else: + new_instances = 1 + new_cpu = max(memory // target_mem_cpu_ratio, 1) + new_memory = new_cpu * target_mem_cpu_ratio + + if cpu != new_cpu or memory != new_memory or instances != new_instances: + log.warning( + f'Adjust Executor Resources based on recommended mem:core:: 7:1 and Bucket: ' + f'{new_memory}g, {new_cpu}cores to better fit aws nodes\n' + f' - spark.executor.cores: {cpu:3}c → {new_cpu}c\n' + f' - spark.executor.memory: {memory:3}g → {new_memory}g\n' + f' - spark.executor.instances: {instances:3}x → {new_instances}x\n' + 'Check y/spark-metrics to compare how your job performance compared to previous runs.\n' + 'Feel free to adjust to spark resource configs in yelpsoa-configs with above newly ' + 'adjusted values.\n', + ) + + if new_cpu < task_cpus: + log.warning( + f'Given spark.task.cpus is {task_cpus}, ' + f'=> adjusted to {new_cpu} to keep it within the limits of adjust spark.executor.cores.\n', + ) + task_cpus = new_cpu + return new_cpu, f'{new_memory}g', new_instances, task_cpus + + # Constants + recommended_memory_gb = self.spark_constants.get('resource_configs', {}).get('recommended', {})['mem'] + medium_cores = self.spark_constants.get('resource_configs', {}).get('medium', {})['cpu'] + medium_memory_mb = self.spark_constants.get('resource_configs', {}).get('medium', {})['mem'] + max_cores = self.spark_constants.get('resource_configs', {}).get('max', {})['cpu'] + max_memory_gb = self.spark_constants.get('resource_configs', {}).get('max', {})['mem'] + + if pool not in ['batch', 'stable_batch']: + log.warning( + f'We are not internally adjusting any spark resources for given pool {pool}. ' + 'Please ensure that given resource requests are optimal.', + ) + elif memory_gb > max_memory_gb or executor_cores > max_cores: + executor_cores, executor_memory = self._cap_executor_resources(executor_cores, executor_memory, memory_mb) + elif force_spark_resource_configs: + log.warning( + '--force-spark-resource-configs is set to true: this can result in non-optimal bin-packing ' + 'of executors on aws nodes or can lead to wastage the resources. ' + "Please use this flag only if you have tested that standard memory/cpu configs won't work for " + 'your job.\nLet us know at #spark if you think, your use-case needs to be standardized.\n', + ) + elif memory_gb >= medium_memory_mb or executor_cores > medium_cores: + (executor_cores, executor_memory, executor_instances, task_cpus) = _calculate_resources( + executor_cores, + memory_gb, + executor_instances, + task_cpus, + medium_memory_mb, + ratio_adj_thresh, + ) + else: + (executor_cores, executor_memory, executor_instances, task_cpus) = _calculate_resources( + executor_cores, + memory_gb, + executor_instances, + task_cpus, + recommended_memory_gb, + ratio_adj_thresh, + ) -def get_spark_conf( - cluster_manager: str, - spark_app_base_name: str, - user_spark_opts: Mapping[str, Any], - paasta_cluster: str, - paasta_pool: str, - paasta_service: str, - paasta_instance: str, - docker_img: str, - aws_creds: Tuple[Optional[str], Optional[str], Optional[str]], - extra_volumes: Optional[List[Mapping[str, str]]] = None, - use_eks: bool = False, - k8s_server_address: Optional[str] = None, - spark_opts_from_env: Optional[Mapping[str, str]] = None, - aws_region: Optional[str] = None, - service_account_name: Optional[str] = None, - force_spark_resource_configs: bool = True, -) -> Dict[str, str]: - """Build spark config dict to run with spark on paasta - - :param cluster_manager: which manager to use, must be in SUPPORTED_CLUSTER_MANAGERS - :param spark_app_base_name: the base name to create spark app, we will append port - and time to make the app name unique for easier to separate the output. Note that - this is noop if `spark_opts_from_env` have `spark.app.name` configured. - :param user_spark_opts: user specified spark config. We will filter out some configs - that is not supposed to be configured by users before adding these changes. - :param paasta_cluster: the cluster name to run the spark job - :param paasta_pool: the pool name to launch the spark job. - :param paasta_service: the service name of the job - :param paasta_instance: the instance name of the job - :param docker_img: the docker image used to launch container for spark executor. - :param aws_creds: the aws creds to be used for this spark job. If a key triplet is passed, - we configure a different credentials provider to support this workflow. - :param extra_volumes: extra files to mount on the spark executors - :param spark_opts_from_env: different from user_spark_opts, configuration in this - dict will not be filtered. This options is left for people who use `paasta spark-run` - to launch the batch, and inside the batch use `spark_tools.paasta` to create - spark session. - :param aws_region: The default aws region to use - :param service_account_name: The k8s service account to use for spark k8s authentication. - If not provided, it uses cert files at {K8S_AUTH_FOLDER} to authenticate. - :param force_spark_resource_configs: skip the resource/instances recalculation. - This is strongly not recommended. - :returns: spark opts in a dict. - """ - # Mesos deprecation - if cluster_manager == 'mesos': - log.warning('Mesos has been deprecated. Please use kubernetes as the cluster manager.\n') - raise UnsupportedClusterManagerException(cluster_manager) - - # for simplicity, all the following computation are assuming spark opts values - # is str type. - user_spark_opts = _convert_user_spark_opts_value_to_str(user_spark_opts) - - app_base_name = ( - user_spark_opts.get('spark.app.name') or - spark_app_base_name - ) + user_spark_opts.update({ + 'spark.executor.cores': str(executor_cores), + 'spark.kubernetes.executor.limit.cores': str(executor_cores), + 'spark.executor.memory': str(executor_memory), + 'spark.executor.instances': str(executor_instances), + 'spark.task.cpus': str(task_cpus), + }) + if 'spark.cores.max' in user_spark_opts: + user_spark_opts['spark.cores.max'] = str(executor_instances * executor_cores) + return user_spark_opts + + def _append_sql_partitions_conf(self, spark_opts: Dict[str, str]) -> Dict[str, str]: + if 'spark.sql.shuffle.partitions' not in spark_opts: + num_partitions = 3 * ( + int(spark_opts.get('spark.cores.max', 0)) or + (int(spark_opts.get('spark.executor.instances', 0)) * + int(spark_opts.get('spark.executor.cores', self.default_spark_srv_conf['spark.executor.cores']))) + ) - ui_port = (spark_opts_from_env or {}).get('spark.ui.port') or _pick_random_port( - app_base_name + str(time.time()), - ) + if ( + 'spark.dynamicAllocation.enabled' in spark_opts and + str(spark_opts['spark.dynamicAllocation.enabled']) == 'true' and + 'spark.dynamicAllocation.maxExecutors' in spark_opts and + str(spark_opts['spark.dynamicAllocation.maxExecutors']) != 'infinity' + ): + num_partitions_dra = 3 * ( + int(spark_opts.get('spark.dynamicAllocation.maxExecutors', 0)) * + int(spark_opts.get('spark.executor.cores', self.default_spark_srv_conf['spark.executor.cores'])) + ) + num_partitions = max(num_partitions, num_partitions_dra) + + num_partitions = num_partitions or self.default_spark_srv_conf['spark.sql.shuffle.partitions'] + _append_spark_config(spark_opts, 'spark.sql.shuffle.partitions', str(num_partitions)) + else: + num_partitions = int(spark_opts['spark.sql.shuffle.partitions']) + _append_spark_config(spark_opts, 'spark.sql.files.minPartitionNum', str(num_partitions)) + _append_spark_config(spark_opts, 'spark.default.parallelism', str(num_partitions)) + + return spark_opts + + def _adjust_spark_requested_resources( + self, + user_spark_opts: Dict[str, str], + cluster_manager: str, + pool: str, + force_spark_resource_configs: bool = False, + ratio_adj_thresh: Any = None, + ) -> Dict[str, str]: + if ratio_adj_thresh is None: + ratio_adj_thresh = self.spark_constants['adjust_executor_res_ratio_thresh'] + executor_cores = int( + user_spark_opts.setdefault( + 'spark.executor.cores', + str(self.default_spark_srv_conf['spark.executor.cores']), + ), + ) + if cluster_manager == 'kubernetes': + executor_instances = self.compute_executor_instances_k8s(user_spark_opts) + user_spark_opts.setdefault('spark.executor.instances', str(executor_instances)) + max_cores = executor_instances * executor_cores + if ( + 'spark.mesos.executor.memoryOverhead' in user_spark_opts and + 'spark.executor.memoryOverhead' not in user_spark_opts + ): + user_spark_opts['spark.executor.memoryOverhead'] = user_spark_opts[ + 'spark.mesos.executor.memoryOverhead' + ] + user_spark_opts.setdefault('spark.kubernetes.executor.limit.cores', str(executor_cores)) + waiting_time = ( + self.spark_constants['default_clusterman_observed_scaling_time'] + + executor_instances * self.spark_constants['default_resources_waiting_time_per_executor'] // 60 + ) + user_spark_opts.setdefault( + 'spark.scheduler.maxRegisteredResourcesWaitingTime', + str(waiting_time) + 'min', + ) + elif cluster_manager == 'local': + executor_instances = int( + user_spark_opts.setdefault( + 'spark.executor.instances', + str(self.default_spark_srv_conf['spark.executor.instances']), + ), + ) + max_cores = executor_instances * executor_cores + else: + raise UnsupportedClusterManagerException(cluster_manager) + + if max_cores < executor_cores: + raise ValueError(f'Total number of cores {max_cores} is less than per-executor cores {executor_cores}') + + # TODO: replace this with kubernetes specific config + num_gpus = int(user_spark_opts.get('spark.mesos.gpus.max', '0')) + task_cpus = int(user_spark_opts.get('spark.task.cpus', '1')) + # we can skip this step if user is not using gpu or do not configure + # task cpus and executor cores + if num_gpus == 0 or (task_cpus != 1 and executor_cores != 1): + return self._recalculate_executor_resources( + user_spark_opts, force_spark_resource_configs, + ratio_adj_thresh, pool, + ) - # app_name from env is already appended port and time to make it unique - app_name = (spark_opts_from_env or {}).get('spark.app.name') - if not app_name: - # We want to make the app name more unique so that we can search it - # from history server. - app_name = f'{app_base_name}_{ui_port}_{int(time.time())}' + if num_gpus > GPUS_HARD_LIMIT: + raise ValueError( + 'Requested {num_gpus} GPUs, which exceeds hard limit of {GPUS_HARD_LIMIT}', + ) - spark_conf = {**(spark_opts_from_env or {}), **_filter_user_spark_opts(user_spark_opts)} + with open(GPU_POOLS_YAML_FILE_PATH) as fp: + pool_def = yaml.safe_load(fp).get(pool) - if aws_creds[2] is not None: - spark_conf['spark.hadoop.fs.s3a.aws.credentials.provider'] = AWS_ENV_CREDENTIALS_PROVIDER + if pool_def is None: + raise ValueError( + 'Unable to adjust spark.task.cpus and spark.executor.cores because ' + f"pool \"{pool}\" not found in gpu_pools", + ) - spark_conf.update({ - 'spark.app.name': app_name, - 'spark.ui.port': str(ui_port), - }) + gpus_per_inst = int(pool_def['gpus_per_instance']) + cpus_per_inst = int(pool_def['cpus_per_instance']) + if gpus_per_inst == 0 or cpus_per_inst == 0: + raise ValueError( + 'Unable to adjust spark.task.cpus and spark.executor.cores because ' + f'pool {pool} does not appear to have any GPUs and/or CPUs', + ) - # adjusted with spark default resources - spark_conf = _adjust_spark_requested_resources( - spark_conf, cluster_manager, paasta_pool, force_spark_resource_configs, - ) + instances = num_gpus // gpus_per_inst + if (instances * gpus_per_inst) != num_gpus: + raise ValueError( + 'Unable to adjust spark.task.cpus and spark.executor.cores because ' + 'spark.mesos.gpus.max=%i is not a multiple of %i' + % (num_gpus, gpus_per_inst), + ) - if cluster_manager == 'kubernetes': - spark_conf.update(_get_k8s_spark_env( - paasta_cluster, - paasta_service, - paasta_instance, - docker_img, - extra_volumes, - paasta_pool, - service_account_name=service_account_name, - include_self_managed_configs=not use_eks, - k8s_server_address=k8s_server_address, - )) - elif cluster_manager == 'local': - spark_conf.update(_get_local_spark_env( - paasta_cluster, - paasta_service, - paasta_instance, - extra_volumes, + cpus_per_gpu = cpus_per_inst // gpus_per_inst + total_cpus = cpus_per_gpu * num_gpus + num_cpus = int(executor_instances) * int(executor_cores) + if num_cpus != total_cpus: + log.warning( + f'spark.cores.max has been adjusted to {total_cpus}. ' + 'See y/horovod for sizing of GPU pools.', + ) + + user_spark_opts.update({ + # Mesos limitation - need this to access GPUs + 'spark.mesos.containerizer': 'mesos', + # For use by horovod.spark.run(...) in place of num_proc + 'spark.default.parallelism': str(num_gpus), + # we need to adjust the requirements to meet the gpus requriements + 'spark.task.cpus': str(cpus_per_gpu), + 'spark.executor.cores': str(cpus_per_gpu * gpus_per_inst), + 'spark.cores.max': str(total_cpus), + }) + return self._recalculate_executor_resources( + user_spark_opts, force_spark_resource_configs, + ratio_adj_thresh, pool, + ) + + def update_spark_srv_configs(self, spark_conf: MutableMapping[str, str]): + additional_conf = {k: v for k, v in self.mandatory_default_spark_srv_conf.items() if k not in spark_conf} + spark_conf.update(additional_conf) + + def get_history_url(self, spark_conf: Mapping[str, str]) -> Optional[str]: + if spark_conf.get('spark.eventLog.enabled') != 'true': + return None + event_log_dir = spark_conf.get('spark.eventLog.dir') + for env, env_conf in self.spark_srv_conf['environments'].items(): + if event_log_dir == env_conf['default_event_log_dir']: + return env_conf['history_server'] + return None + + def _append_event_log_conf( + self, + spark_opts: Dict[str, str], + access_key: Optional[str], + secret_key: Optional[str], + session_token: Optional[str] = None, + ) -> Dict[str, str]: + enabled = spark_opts.setdefault('spark.eventLog.enabled', 'true').lower() + if enabled != 'true': + # user configured to disable log, not continue + return spark_opts + + event_log_dir = spark_opts.get('spark.eventLog.dir') + if event_log_dir is not None: + # we don't want to overwrite user's settings + return spark_opts + + if len(self.spark_srv_conf.items()) == 0: + log.warning('spark_srv_conf is empty, disable event log') + return spark_opts + + try: + account_id = ( + boto3.client( + 'sts', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + aws_session_token=session_token, + ) + .get_caller_identity() + .get('Account') + ) + except Exception as e: + log.warning('Failed to identify account ID, error: {}'.format(str(e))) + spark_opts['spark.eventLog.enabled'] = 'false' + return spark_opts + + for conf in self.spark_srv_conf.get('environments', {}).values(): + if account_id == conf['account_id']: + spark_opts['spark.eventLog.enabled'] = 'true' + spark_opts['spark.eventLog.dir'] = conf['default_event_log_dir'] + return spark_opts + + log.warning(f'Disable event log because No preset event log dir for account: {account_id}') + spark_opts['spark.eventLog.enabled'] = 'false' + return spark_opts + + def compute_approx_hourly_cost_dollars( + self, + spark_conf: Mapping[str, str], + paasta_cluster: str, + paasta_pool: str, + ) -> Tuple[float, float]: + per_executor_cores = int(spark_conf.get( + 'spark.executor.cores', + self.default_spark_srv_conf['spark.executor.cores'], )) - else: - raise UnsupportedClusterManagerException(cluster_manager) + max_cores = per_executor_cores * (int(spark_conf.get( + 'spark.executor.instances', + self.default_spark_srv_conf['spark.executor.instances'], + ))) + min_cores = max_cores + if 'spark.dynamicAllocation.enabled' in spark_conf and spark_conf['spark.dynamicAllocation.enabled'] == 'true': + max_cores = per_executor_cores * (int( + spark_conf.get('spark.dynamicAllocation.maxExecutors', max_cores), + )) + min_cores = per_executor_cores * (int( + spark_conf.get('spark.dynamicAllocation.minExecutors', min_cores), + )) + + cost_factor = self.spark_costs.get(paasta_cluster, dict())[paasta_pool] + + min_dollars = round(min_cores * cost_factor, 5) + max_dollars = round(max_cores * cost_factor, 5) + if max_dollars * 24 > self.spark_constants['high_cost_threshold_daily']: + log.warning( + TextColors.red( + TextColors.bold( + '\n!!!!! HIGH COST ALERT !!!!!', + ), + ), + ) + log.warning( + TextColors.magenta( + TextColors.bold( + f'\nExpected {"maximum" if min_dollars != max_dollars else ""} cost based on requested resources: ' + f'${str(max_dollars)} every hour and ${str(max_dollars * 24)} in a day.' + f'\nPlease monitor y/spark-metrics for memory and cpu usage to tune requested executor count' + f' config spark.executor.instances.\nFollow y/write-spark-job for optimization tips.\n', + ), + ), + ) + return min_dollars, max_dollars - # configure dynamic resource allocation configs - spark_conf = get_dra_configs(spark_conf) + def get_spark_conf( + self, + cluster_manager: str, + spark_app_base_name: str, + user_spark_opts: Mapping[str, Any], + paasta_cluster: str, + paasta_pool: str, + paasta_service: str, + paasta_instance: str, + docker_img: str, + aws_creds: Tuple[Optional[str], Optional[str], Optional[str]], + extra_volumes: Optional[List[Mapping[str, str]]] = None, + use_eks: bool = False, + k8s_server_address: Optional[str] = None, + spark_opts_from_env: Optional[Mapping[str, str]] = None, + aws_region: Optional[str] = None, + service_account_name: Optional[str] = None, + force_spark_resource_configs: bool = True, + ) -> Dict[str, str]: + """Build spark config dict to run with spark on paasta + + :param cluster_manager: which manager to use, must be in SUPPORTED_CLUSTER_MANAGERS + :param spark_app_base_name: the base name to create spark app, we will append port + and time to make the app name unique for easier to separate the output. Note that + this is noop if `spark_opts_from_env` have `spark.app.name` configured. + :param user_spark_opts: user specified spark config. We will filter out some configs + that is not supposed to be configured by users before adding these changes. + :param paasta_cluster: the cluster name to run the spark job + :param paasta_pool: the pool name to launch the spark job. + :param paasta_service: the service name of the job + :param paasta_instance: the instance name of the job + :param docker_img: the docker image used to launch container for spark executor. + :param aws_creds: the aws creds to be used for this spark job. If a key triplet is passed, + we configure a different credentials provider to support this workflow. + :param extra_volumes: extra files to mount on the spark executors + :param use_eks: flag to specify if EKS cluster should be used + :param k8s_server_address: address of the k8s server to be used + :param spark_opts_from_env: different from user_spark_opts, configuration in this + dict will not be filtered. This options is left for people who use `paasta spark-run` + to launch the batch, and inside the batch use `spark_tools.paasta` to create + spark session. + :param aws_region: The default aws region to use + :param service_account_name: The k8s service account to use for spark k8s authentication. + If not provided, it uses cert files at {K8S_AUTH_FOLDER} to authenticate. + :param force_spark_resource_configs: skip the resource/instances recalculation. + This is strongly not recommended. + :returns: spark opts in a dict. + """ + # Mesos deprecation + if cluster_manager == 'mesos': + log.warning('Mesos has been deprecated. Please use kubernetes as the cluster manager.\n') + raise UnsupportedClusterManagerException(cluster_manager) + + # for simplicity, all the following computation are assuming spark opts values + # is str type. + user_spark_opts = _convert_user_spark_opts_value_to_str(user_spark_opts) + + app_base_name = ( + user_spark_opts.get('spark.app.name') or + spark_app_base_name + ) - # generate cost warnings - compute_approx_hourly_cost_dollars(spark_conf, paasta_cluster, paasta_pool) + ui_port = (spark_opts_from_env or {}).get('spark.ui.port') or _pick_random_port( + app_base_name + str(time.time()), + ) - # configure spark prometheus metrics - spark_conf = _append_spark_prometheus_conf(spark_conf) + # app_name from env is already appended port and time to make it unique + app_name = (spark_opts_from_env or {}).get('spark.app.name') + if not app_name: + # We want to make the app name more unique so that we can search it + # from history server. + app_name = f'{app_base_name}_{ui_port}_{int(time.time())}' - # configure spark_event_log - spark_conf = _append_event_log_conf(spark_conf, *aws_creds) + spark_conf = {**(spark_opts_from_env or {}), **_filter_user_spark_opts(user_spark_opts)} - # configure sql shuffle partitions - spark_conf = _append_sql_partitions_conf(spark_conf) + if aws_creds[2] is not None: + spark_conf['spark.hadoop.fs.s3a.aws.credentials.provider'] = AWS_ENV_CREDENTIALS_PROVIDER - # add spark srv config defaults if not specified - update_spark_srv_configs(spark_conf) + spark_conf.update({ + 'spark.app.name': app_name, + 'spark.ui.port': str(ui_port), + }) - # configure spark Console Progress - if _is_jupyterhub_job(spark_conf.get('spark.app.name', '')): - spark_conf = _append_spark_config(spark_conf, 'spark.ui.showConsoleProgress', 'true') + # adjusted with spark default resources + spark_conf = self._adjust_spark_requested_resources( + spark_conf, cluster_manager, paasta_pool, force_spark_resource_configs, + ) + + if cluster_manager == 'kubernetes': + spark_conf.update(_get_k8s_spark_env( + paasta_cluster, + paasta_service, + paasta_instance, + docker_img, + extra_volumes, + paasta_pool, + service_account_name=service_account_name, + include_self_managed_configs=not use_eks, + k8s_server_address=k8s_server_address, + )) + elif cluster_manager == 'local': + spark_conf.update(_get_local_spark_env( + paasta_cluster, + paasta_service, + paasta_instance, + extra_volumes, + )) + else: + raise UnsupportedClusterManagerException(cluster_manager) - spark_conf = _append_aws_credentials_conf(spark_conf, *aws_creds, aws_region) - return spark_conf + # configure dynamic resource allocation configs + spark_conf = self.get_dra_configs(spark_conf) + + # generate cost warnings + self.compute_approx_hourly_cost_dollars(spark_conf, paasta_cluster, paasta_pool) + + # configure spark prometheus metrics + spark_conf = self._append_spark_prometheus_conf(spark_conf) + + # configure spark_event_log + spark_conf = self._append_event_log_conf(spark_conf, *aws_creds) + + # configure sql shuffle partitions + spark_conf = self._append_sql_partitions_conf(spark_conf) + + # add spark srv config defaults if not specified + self.update_spark_srv_configs(spark_conf) + + # configure spark Console Progress + if _is_jupyterhub_job(spark_conf.get('spark.app.name', '')): + spark_conf = _append_spark_config(spark_conf, 'spark.ui.showConsoleProgress', 'true') + + spark_conf = _append_aws_credentials_conf(spark_conf, *aws_creds, aws_region) + return spark_conf def parse_memory_string(memory_string: Optional[str]) -> int: @@ -1146,16 +1176,6 @@ def get_signalfx_url(spark_conf: Mapping[str, str]) -> str: ) -def get_history_url(spark_conf: Mapping[str, str]) -> Optional[str]: - if spark_conf.get('spark.eventLog.enabled') != 'true': - return None - event_log_dir = spark_conf.get('spark.eventLog.dir') - for env, env_conf in spark_srv_conf['environments'].items(): - if event_log_dir == env_conf['default_event_log_dir']: - return env_conf['history_server'] - return None - - def get_resources_requested(spark_opts: Mapping[str, str]) -> Mapping[str, int]: num_executors = ( # spark on k8s directly configure num instances diff --git a/setup.py b/setup.py index bb4f36a..95ba6c3 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ setup( name='service-configuration-lib', - version='2.17.3', + version='2.18.0', provides=['service_configuration_lib'], description='Start, stop, and inspect Yelp SOA services', url='https://github.com/Yelp/service_configuration_lib', diff --git a/tests/spark_config_test.py b/tests/spark_config_test.py index b1e8a43..c3b9f89 100644 --- a/tests/spark_config_test.py +++ b/tests/spark_config_test.py @@ -6,74 +6,13 @@ from unittest import mock import pytest -import requests import yaml -from pytest import MonkeyPatch +from service_configuration_lib import spark_config from service_configuration_lib import utils -TEST_ACCOUNT_ID = '123456789' -spark_run_conf = { - 'environments': { - 'testing': { - 'account_id': TEST_ACCOUNT_ID, - 'default_event_log_dir': 's3a://test/eventlog', - 'history_server': 'https://spark-history-testing', - }, - }, - 'spark_constants': { - 'target_mem_cpu_ratio': 7, - 'resource_configs': { - 'recommended': { - 'cpu': 4, - 'mem': 28, - }, - 'medium': { - 'cpu': 8, - 'mem': 56, - }, - 'max': { - 'cpu': 12, - 'mem': 110, - }, - }, - 'cost_factor': { - 'test-cluster': { - 'test-pool': 100, - }, - 'spark-pnw-prod': { - 'batch': 0.041, - 'stable_batch': 0.142, - }, - }, - 'adjust_executor_res_ratio_thresh': 99999, - 'default_resources_waiting_time_per_executor': 2, - 'default_clusterman_observed_scaling_time': 15, - 'high_cost_threshold_daily': 500, - 'defaults': { - 'spark.executor.cores': 4, - 'spark.executor.instances': 2, - 'spark.executor.memory': 28, - 'spark.task.cpus': 1, - 'spark.sql.shuffle.partitions': 128, - 'spark.dynamicAllocation.executorAllocationRatio': 0.8, - 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '1500s', - 'spark.yelp.dra.minExecutorRatio': 0.25, - }, - 'mandatory_defaults': { - 'spark.kubernetes.allocation.batch.size': 512, - 'spark.kubernetes.decommission.script': '/opt/spark/kubernetes/dockerfiles/spark/decom.sh', - 'spark.logConf': 'true', - }, - }, -} -mp = MonkeyPatch() -with open('tmp_spark_srv_config.yaml', 'w+') as fp: - fp.write(yaml.dump(spark_run_conf)) - mp.setattr(utils, 'DEFAULT_SPARK_RUN_CONFIG', os.path.abspath(fp.name)) - -from service_configuration_lib import spark_config # noqa +TEST_ACCOUNT_ID = '123456789' @pytest.fixture @@ -200,12 +139,12 @@ def test_pick_random_port(): class MockConfigFunction: - def __init__(self, mock_func, return_value): + def __init__(self, mock_obj, mock_func, return_value): self.return_value = return_value def side_effect(*args, **kwargs): return {**args[0], **self.return_value} - self._patch = mock.patch.object(spark_config, mock_func, side_effect=side_effect) + self._patch = mock.patch.object(mock_obj, mock_func, side_effect=side_effect) def __enter__(self): self.mocker = self._patch.__enter__() @@ -223,9 +162,79 @@ class TestGetSparkConf: docker_image = 'docker-dev.yelp.com/test-image' executor_cores = '10' spark_app_base_name = 'test_app_base_name' - default_mesos_leader = 'mesos://some-url.yelp.com:5050' aws_provider_key = 'spark.hadoop.fs.s3a.aws.credentials.provider' + @pytest.fixture + def mock_spark_srv_conf_file(self, tmpdir, monkeypatch): + spark_run_conf = { + 'environments': { + 'testing': { + 'account_id': TEST_ACCOUNT_ID, + 'default_event_log_dir': 's3a://test/eventlog', + 'history_server': 'https://spark-history-testing', + }, + }, + 'spark_constants': { + 'target_mem_cpu_ratio': 7, + 'resource_configs': { + 'recommended': { + 'cpu': 4, + 'mem': 28, + }, + 'medium': { + 'cpu': 8, + 'mem': 56, + }, + 'max': { + 'cpu': 12, + 'mem': 110, + }, + }, + 'cost_factor': { + 'test-cluster': { + 'test-pool': 100, + }, + 'spark-pnw-prod': { + 'batch': 0.041, + 'stable_batch': 0.142, + }, + }, + 'adjust_executor_res_ratio_thresh': 99999, + 'default_resources_waiting_time_per_executor': 2, + 'default_clusterman_observed_scaling_time': 15, + 'high_cost_threshold_daily': 500, + 'defaults': { + 'spark.executor.cores': 4, + 'spark.executor.instances': 2, + 'spark.executor.memory': 28, + 'spark.task.cpus': 1, + 'spark.sql.shuffle.partitions': 128, + 'spark.dynamicAllocation.executorAllocationRatio': 0.8, + 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '1500s', + 'spark.yelp.dra.minExecutorRatio': 0.25, + }, + 'mandatory_defaults': { + 'spark.kubernetes.allocation.batch.size': 512, + 'spark.kubernetes.decommission.script': '/opt/spark/kubernetes/dockerfiles/spark/decom.sh', + 'spark.logConf': 'true', + }, + }, + } + fp = tmpdir.join('tmp_spark_srv_config.yaml') + fp.write(yaml.dump(spark_run_conf)) + monkeypatch.setattr(utils, 'DEFAULT_SPARK_RUN_CONFIG', str(fp)) + + @pytest.fixture + def mock_log(self, monkeypatch): + mock_log = mock.Mock() + monkeypatch.setattr(spark_config, 'log', mock_log) + return mock_log + + @pytest.fixture + def mock_time(self): + with mock.patch.object(spark_config.time, 'time', return_value=123.456): + yield 123.456 + @pytest.fixture def base_volumes(self): return [{'hostPath': '/tmp', 'containerPath': '/tmp', 'mode': 'RO'}] @@ -250,6 +259,23 @@ def mock_existed_files(self, mock_paasta_volumes): with mock.patch('os.path.exists', side_effect=lambda f: f in existed_files): yield existed_files + @pytest.mark.parametrize( + 'spark_conf,expected_output', [ + ({'spark.eventLog.enabled': 'false'}, None), + ( + {'spark.eventLog.enabled': 'true', 'spark.eventLog.dir': 's3a://test/eventlog'}, + 'https://spark-history-testing', + ), + ( + {'spark.eventLog.enabled': 'true', 'spark.eventLog.dir': 's3a://test/different/eventlog'}, + None, + ), + ], + ) + def test_get_history_url(self, spark_conf, expected_output, mock_spark_srv_conf_file): + spark_conf_builder = spark_config.SparkConfBuilder() + assert spark_conf_builder.get_history_url(spark_conf) == expected_output + def test_get_k8s_volume_hostpath_dict(self): assert spark_config._get_k8s_volume_hostpath_dict( '/host/file1', '/container/file1', 'RO', itertools.count(), @@ -594,6 +620,7 @@ def test_adjust_spark_requested_resources( expected_output, force_spark_resource_configs, gpu_pool, + mock_spark_srv_conf_file, ): ratio_adj_thresh = sys.maxsize pool = ( @@ -601,8 +628,8 @@ def test_adjust_spark_requested_resources( if user_spark_opts.get('spark.mesos.gpus.max', '0') == '0' else next(iter(gpu_pool.keys())) ) - - output = spark_config._adjust_spark_requested_resources( + spark_conf_builder = spark_config.SparkConfBuilder() + output = spark_conf_builder._adjust_spark_requested_resources( user_spark_opts, cluster_manager, pool, force_spark_resource_configs, ratio_adj_thresh, ) for key in expected_output.keys(): @@ -627,9 +654,11 @@ def test_adjust_spark_requested_resources_error( spark_opts, pool, gpu_pool, + mock_spark_srv_conf_file, ): with pytest.raises(ValueError): - spark_config._adjust_spark_requested_resources(spark_opts, cluster_manager, pool) + spark_conf_builder = spark_config.SparkConfBuilder() + spark_conf_builder._adjust_spark_requested_resources(spark_opts, cluster_manager, pool) @pytest.mark.parametrize( 'user_spark_opts,expected_output', [ @@ -724,8 +753,10 @@ def test_get_dra_configs( self, user_spark_opts, expected_output, + mock_spark_srv_conf_file, ): - output = spark_config.get_dra_configs(user_spark_opts) + spark_conf_builder = spark_config.SparkConfBuilder() + output = spark_conf_builder.get_dra_configs(user_spark_opts) for key in expected_output.keys(): assert output[key] == expected_output[key], f'wrong value for {key}' @@ -784,8 +815,10 @@ def test_compute_approx_hourly_cost_dollars( paasta_cluster, paasta_pool, expected_output, + mock_spark_srv_conf_file, ): - output = spark_config.compute_approx_hourly_cost_dollars(spark_conf, paasta_cluster, paasta_pool) + spark_conf_builder = spark_config.SparkConfBuilder() + output = spark_conf_builder.compute_approx_hourly_cost_dollars(spark_conf, paasta_cluster, paasta_pool) assert output == expected_output @pytest.mark.parametrize( @@ -831,8 +864,10 @@ def test_append_event_log_conf( user_spark_opts, aws_creds, expected_output, + mock_spark_srv_conf_file, ): - output = spark_config._append_event_log_conf(user_spark_opts, *aws_creds) + spark_conf_builder = spark_config.SparkConfBuilder() + output = spark_conf_builder._append_event_log_conf(user_spark_opts, *aws_creds) for key in expected_output: assert output[key] == expected_output[key] @@ -884,9 +919,10 @@ def test_append_event_log_conf( ], ) def test_append_sql_partitions_conf( - self, user_spark_opts, expected_output, + self, user_spark_opts, expected_output, mock_spark_srv_conf_file, ): - output = spark_config._append_sql_partitions_conf(user_spark_opts) + spark_conf_builder = spark_config.SparkConfBuilder() + output = spark_conf_builder._append_sql_partitions_conf(user_spark_opts) keys = [ 'spark.sql.shuffle.partitions', 'spark.sql.files.minPartitionNum', @@ -949,7 +985,7 @@ def mock_append_spark_prometheus_conf(self): } with MockConfigFunction( - '_append_spark_prometheus_conf', return_value, + spark_config.SparkConfBuilder, '_append_spark_prometheus_conf', return_value, ) as m: yield m @@ -957,14 +993,14 @@ def mock_append_spark_prometheus_conf(self): def mock_append_spark_conf_log(self): return_value = {'spark.logConf': 'true'} with MockConfigFunction( - '_append_spark_config', return_value, + spark_config, '_append_spark_config', return_value, ) as m: yield m @pytest.fixture def mock_get_mesos_docker_volumes_conf(self): return_value = {'spark.mesos.executor.docker.volumes': '/tmp:/tmp:ro'} - with MockConfigFunction('_get_mesos_docker_volumes_conf', return_value) as m: + with MockConfigFunction(spark_config, '_get_mesos_docker_volumes_conf', return_value) as m: yield m @pytest.fixture @@ -975,9 +1011,7 @@ def mock_append_sql_partitions_conf(self): 'spark.default.parallelism', ] return_value = {k: '10' for k in keys} - with MockConfigFunction( - '_append_sql_partitions_conf', return_value, - ) as m: + with MockConfigFunction(spark_config.SparkConfBuilder, '_append_sql_partitions_conf', return_value) as m: yield m @pytest.fixture @@ -986,7 +1020,7 @@ def mock_append_event_log_conf(self): 'spark.eventLog.enabled': 'true', 'spark.eventLog.dir': 's3a://test/bucket/', } - with MockConfigFunction('_append_event_log_conf', return_value) as m: + with MockConfigFunction(spark_config.SparkConfBuilder, '_append_event_log_conf', return_value) as m: yield m @pytest.fixture @@ -997,17 +1031,7 @@ def mock_append_aws_credentials_conf(self): 'spark.executorEnv.AWS_SESSION_TOKEN': 'we_all_key', 'spark.executorEnv.AWS_DEFAULT_REGION': 'ice_cream', } - with MockConfigFunction('_append_aws_credentials_conf', return_value) as m: - yield m - - @pytest.fixture - def mock_adjust_spark_requested_resources_mesos(self): - return_value = { - 'spark.cores.max': '10', - 'spark.executor.cores': self.executor_cores, - 'spark.executor.memory': '2g', - } - with MockConfigFunction('_adjust_spark_requested_resources', return_value) as m: + with MockConfigFunction(spark_config, '_append_aws_credentials_conf', return_value) as m: yield m @pytest.fixture @@ -1017,7 +1041,7 @@ def mock_adjust_spark_requested_resources_kubernetes(self): 'spark.executor.cores': self.executor_cores, 'spark.executor.memory': '2g', } - with MockConfigFunction('_adjust_spark_requested_resources', return_value) as m: + with MockConfigFunction(spark_config.SparkConfBuilder, '_adjust_spark_requested_resources', return_value) as m: yield m @pytest.fixture @@ -1031,7 +1055,7 @@ def mock_get_dra_configs(self): 'spark.dynamicAllocation.minExecutors': '0', 'spark.dynamicAllocation.cachedExecutorIdleTimeout': '900s', } - with MockConfigFunction('get_dra_configs', return_value) as m: + with MockConfigFunction(spark_config.SparkConfBuilder, 'get_dra_configs', return_value) as m: yield m @pytest.fixture @@ -1041,7 +1065,7 @@ def mock_update_spark_srv_configs(self): 'spark.kubernetes.decommission.script': '/opt/spark/kubernetes/dockerfiles/spark/decom.sh', 'spark.logConf': 'true', } - with MockConfigFunction('update_spark_srv_configs', return_value) as m: + with MockConfigFunction(spark_config.SparkConfBuilder, 'update_spark_srv_configs', return_value) as m: yield m @pytest.fixture @@ -1052,37 +1076,6 @@ def mock_secret(self, tmpdir, monkeypatch): monkeypatch.setattr(spark_config, 'DEFAULT_SPARK_MESOS_SECRET_FILE', str(fp)) return secret - @pytest.fixture(params=[False, True]) - def with_secret(self, request): - return request.param - - @pytest.fixture - def assert_mesos_secret(self, with_secret, mock_secret): - expected_output = mock_secret if with_secret else None - - def verify(output): - if expected_output: - key = 'spark.mesos.secret' - assert output[key] == mock_secret - return [key] - return [] - return verify - - @pytest.fixture - def mock_request_mesos_leader(self): - return_value = self.default_mesos_leader.replace('mesos://', 'http://') + '/#/' - with mock.patch.object(spark_config.requests, 'get') as m: - m.return_value = mock.Mock(url=return_value) - yield m - - def test_find_spark_master(self, mock_request_mesos_leader): - assert spark_config.find_spark_master('test-cluster') == 'mesos://some-url.yelp.com:5050' - - def test_find_spark_master_error(self, mock_request_mesos_leader): - mock_request_mesos_leader.side_effect = requests.RequestException() - with pytest.raises(ValueError): - spark_config.find_spark_master('test-cluster') - def test_convert_user_spark_opts_value_str(self): spark_conf = { 'spark.executor.memory': '4g', @@ -1095,56 +1088,6 @@ def test_convert_user_spark_opts_value_str(self): 'spark.eventLog.enabled': 'false', } - @pytest.fixture(params=[None, 'test-mesos:5050']) - def mesos_leader(self, request): - return request.param - - @pytest.fixture - def assert_mesos_leader(self, mesos_leader, mock_request_mesos_leader): - expected_output = f'mesos://{mesos_leader}' if mesos_leader else self.default_mesos_leader - - def validate(output): - key = 'spark.master' - assert output[key] == expected_output - return [key] - - return validate - - @pytest.fixture(params=[None, {'workdir': '/tmp'}]) - def extra_docker_params(self, request): - return request.param - - @pytest.fixture - def assert_docker_parameters(self, extra_docker_params): - def verify(output): - expected_output = ( - f'cpus={self.executor_cores},' - f'label=paasta_service={self.service},' - f'label=paasta_instance={self.instance}' - ) - if extra_docker_params: - for key, value in extra_docker_params.items(): - expected_output += f',{key}={value}' - - key = 'spark.mesos.executor.docker.parameters' - assert output[key] == expected_output - return [key] - return verify - - @pytest.fixture(params=[False, True]) - def needs_docker_cfg(self, request): - return request.param - - @pytest.fixture - def assert_docker_cfg(self, needs_docker_cfg): - def verify(output): - if needs_docker_cfg: - key = 'spark.mesos.uris' - assert output[key] == 'file:///root/.dockercfg' - return [key] - return [] - return verify - @pytest.fixture def mock_pick_random_port(self): port = '12345' @@ -1276,6 +1219,7 @@ def test_leaders_get_spark_conf_kubernetes( mock_adjust_spark_requested_resources_kubernetes, mock_get_dra_configs, mock_update_spark_srv_configs, + mock_spark_srv_conf_file, mock_time, assert_ui_port, assert_app_name, @@ -1291,7 +1235,8 @@ def test_leaders_get_spark_conf_kubernetes( aws_creds = (None, None, None) aws_region = 'ice_cream' - output = spark_config.get_spark_conf( + spark_conf_builder = spark_config.SparkConfBuilder() + output = spark_conf_builder.get_spark_conf( cluster_manager='kubernetes', spark_app_base_name=self.spark_app_base_name, user_spark_opts=user_spark_opts, @@ -1365,6 +1310,7 @@ def test_show_console_progress_jupyter( mock_append_sql_partitions_conf, mock_adjust_spark_requested_resources_kubernetes, mock_get_dra_configs, + mock_spark_srv_conf_file, mock_time, assert_ui_port, assert_app_name, @@ -1373,7 +1319,8 @@ def test_show_console_progress_jupyter( ): aws_creds = (None, None, None) aws_region = 'ice_cream' - output = spark_config.get_spark_conf( + spark_conf_builder = spark_config.SparkConfBuilder() + output = spark_conf_builder.get_spark_conf( cluster_manager='local', spark_app_base_name='jupyterhub_test_name', user_spark_opts={}, @@ -1403,6 +1350,7 @@ def test_local_spark( mock_adjust_spark_requested_resources_kubernetes, mock_get_dra_configs, mock_update_spark_srv_configs, + mock_spark_srv_conf_file, mock_time, assert_ui_port, assert_app_name, @@ -1411,7 +1359,8 @@ def test_local_spark( ): aws_creds = (None, None, None) aws_region = 'ice_cream' - output = spark_config.get_spark_conf( + spark_conf_builder = spark_config.SparkConfBuilder() + output = spark_conf_builder.get_spark_conf( cluster_manager='local', spark_app_base_name=self.spark_app_base_name, user_spark_opts=user_spark_opts or {}, @@ -1449,6 +1398,59 @@ def test_local_spark( mock.ANY, ) + @pytest.mark.parametrize( + 'adj_thresh,cpu,memory,expected_cpu,expected_memory', [ + (999, 10, '60g', 8, '56g'), + (60, 10, '60g', 8, '56g'), + (7, 10, '60g', 10, '60g'), + (999, 4, '32g', 4, '28g'), + (32, 4, '32g', 4, '28g'), + (8, 4, '32g', 4, '32g'), + (999, 2, '8g', 1, '7g'), + (8, 2, '8g', 1, '7g'), + (7, 2, '8g', 2, '8g'), + ], + ) + def test_adjust_cpu_mem_ratio_thresh( + self, adj_thresh, cpu, memory, expected_cpu, + expected_memory, mock_spark_srv_conf_file, + ): + spark_opts = dict() + spark_opts['spark.executor.cores'] = cpu + spark_opts['spark.executor.memory'] = memory + spark_opts['spark.executor.instances'] = 1 + spark_opts['spark.task.cpus'] = 1 + + spark_conf_builder = spark_config.SparkConfBuilder() + result_dict = spark_conf_builder._recalculate_executor_resources(spark_opts, False, adj_thresh, 'batch') + assert int(result_dict['spark.executor.cores']) == expected_cpu + assert result_dict['spark.executor.memory'] == expected_memory + assert int(result_dict['spark.executor.instances']) == 1 + assert int(result_dict['spark.task.cpus']) == 1 + + @pytest.mark.parametrize( + 'adj_thresh,cpu,memory,expected_cpu,expected_memory', [ + (999, 10, '60g', 10, '60g'), + (7, 2, '8g', 2, '8g'), + ], + ) + def test_adjust_cpu_mem_ratio_thresh_non_regular_pool( + self, adj_thresh, cpu, memory, expected_cpu, + expected_memory, mock_spark_srv_conf_file, + ): + spark_opts = dict() + spark_opts['spark.executor.cores'] = cpu + spark_opts['spark.executor.memory'] = memory + spark_opts['spark.executor.instances'] = 1 + spark_opts['spark.task.cpus'] = 1 + + spark_conf_builder = spark_config.SparkConfBuilder() + result_dict = spark_conf_builder._recalculate_executor_resources(spark_opts, False, adj_thresh, 'non_batch') + assert int(result_dict['spark.executor.cores']) == expected_cpu + assert result_dict['spark.executor.memory'] == expected_memory + assert int(result_dict['spark.executor.instances']) == 1 + assert int(result_dict['spark.task.cpus']) == 1 + def test_stringify_spark_env(): conf = {'spark.mesos.leader': '1234', 'spark.mesos.principal': 'spark'} @@ -1457,23 +1459,6 @@ def test_stringify_spark_env(): ) -@pytest.mark.parametrize( - 'spark_conf,expected_output', [ - ({'spark.eventLog.enabled': 'false'}, None), - ( - {'spark.eventLog.enabled': 'true', 'spark.eventLog.dir': 's3a://test/eventlog'}, - 'https://spark-history-testing', - ), - ( - {'spark.eventLog.enabled': 'true', 'spark.eventLog.dir': 's3a://test/different/eventlog'}, - None, - ), - ], -) -def test_get_history_url(spark_conf, expected_output): - assert spark_config.get_history_url(spark_conf) == expected_output - - @pytest.mark.parametrize( 'memory_string,expected_output', [ ('1g', 1024), @@ -1516,53 +1501,6 @@ def test_get_signalfx_url(): ) -@pytest.mark.parametrize( - 'adj_thresh,cpu,memory,expected_cpu,expected_memory', [ - (999, 10, '60g', 8, '56g'), - (60, 10, '60g', 8, '56g'), - (7, 10, '60g', 10, '60g'), - (999, 4, '32g', 4, '28g'), - (32, 4, '32g', 4, '28g'), - (8, 4, '32g', 4, '32g'), - (999, 2, '8g', 1, '7g'), - (8, 2, '8g', 1, '7g'), - (7, 2, '8g', 2, '8g'), - ], -) -def test_adjust_cpu_mem_ratio_thresh(adj_thresh, cpu, memory, expected_cpu, expected_memory): - spark_opts = dict() - spark_opts['spark.executor.cores'] = cpu - spark_opts['spark.executor.memory'] = memory - spark_opts['spark.executor.instances'] = 1 - spark_opts['spark.task.cpus'] = 1 - - result_dict = spark_config._recalculate_executor_resources(spark_opts, False, adj_thresh, 'batch') - assert int(result_dict['spark.executor.cores']) == expected_cpu - assert result_dict['spark.executor.memory'] == expected_memory - assert int(result_dict['spark.executor.instances']) == 1 - assert int(result_dict['spark.task.cpus']) == 1 - - -@pytest.mark.parametrize( - 'adj_thresh,cpu,memory,expected_cpu,expected_memory', [ - (999, 10, '60g', 10, '60g'), - (7, 2, '8g', 2, '8g'), - ], -) -def test_adjust_cpu_mem_ratio_thresh_non_regular_pool(adj_thresh, cpu, memory, expected_cpu, expected_memory): - spark_opts = dict() - spark_opts['spark.executor.cores'] = cpu - spark_opts['spark.executor.memory'] = memory - spark_opts['spark.executor.instances'] = 1 - spark_opts['spark.task.cpus'] = 1 - - result_dict = spark_config._recalculate_executor_resources(spark_opts, False, adj_thresh, 'non_batch') - assert int(result_dict['spark.executor.cores']) == expected_cpu - assert result_dict['spark.executor.memory'] == expected_memory - assert int(result_dict['spark.executor.instances']) == 1 - assert int(result_dict['spark.task.cpus']) == 1 - - @pytest.mark.parametrize( 'spark_opts,expected_output', [ # mesos ( 2 instances, not configure memory overhead, default: 384m ) @@ -1762,6 +1700,3 @@ def test_send_and_calculate_resources_cost( ) def test_get_k8s_resource_name_limit_size_with_hash(instance_name, expected_instance_label): assert expected_instance_label == spark_config._get_k8s_resource_name_limit_size_with_hash(instance_name) - - -os.remove('tmp_spark_srv_config.yaml')