From 0fa1ddb4a16399b973cce50b46de7e6a1fc4d005 Mon Sep 17 00:00:00 2001 From: jsleight <64800239+jsleight@users.noreply.github.com> Date: Thu, 23 Jun 2022 15:03:12 -0400 Subject: [PATCH] Add auto_set_temporary_credentials_provider to get_spark_conf for spark 3.2.0 support. (#80) --- service_configuration_lib/spark_config.py | 10 ++++++---- setup.py | 2 +- tests/spark_config_test.py | 11 ++++++++--- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/service_configuration_lib/spark_config.py b/service_configuration_lib/spark_config.py index 6c34962..72bcc29 100644 --- a/service_configuration_lib/spark_config.py +++ b/service_configuration_lib/spark_config.py @@ -642,6 +642,7 @@ def get_spark_conf( mesos_leader: Optional[str] = None, spark_opts_from_env: Optional[Mapping[str, str]] = None, load_paasta_default_volumes: bool = False, + auto_set_temporary_credentials_provider: bool = True, ) -> Dict[str, str]: """Build spark config dict to run with spark on paasta @@ -673,6 +674,10 @@ def get_spark_conf( spark session. :param load_paasta_default_volumes: whether to include default paasta mounted volumes into the spark executors. + :param auto_set_temporary_credentials_provider: whether to set the temporary credentials + provider if the session token exists. In hadoop-aws 3.2.1 this is needed, but + in hadoop-aws 3.3.1 the temporary credentials provider is the new default and + causes errors if explicitly set for unknown reasons. :returns: spark opts in a dict. """ # for simplicity, all the following computation are assuming spark opts values @@ -697,10 +702,7 @@ def get_spark_conf( spark_conf = {**(spark_opts_from_env or {}), **_filter_user_spark_opts(user_spark_opts)} - # We automatically update the credentials provider if the session token is included. - # By default the SimpleAWSCredentials provider is used, which is incompatible with - # temporary credentials. More details in SEC-13906. - if aws_creds[2] is not None: + if aws_creds[2] is not None and auto_set_temporary_credentials_provider: spark_conf['spark.hadoop.fs.s3a.aws.credentials.provider'] = AWS_TEMP_CREDENTIALS_PROVIDER spark_conf.update({ diff --git a/setup.py b/setup.py index 0bfb313..90ecb9f 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ setup( name='service-configuration-lib', - version='2.10.5', + version='2.10.6', provides=['service_configuration_lib'], description='Start, stop, and inspect Yelp SOA services', url='https://github.com/Yelp/service_configuration_lib', diff --git a/tests/spark_config_test.py b/tests/spark_config_test.py index 0a37578..167c357 100644 --- a/tests/spark_config_test.py +++ b/tests/spark_config_test.py @@ -919,7 +919,8 @@ def verify(output): return verify - def test_get_spark_conf_aws_session(self): + @pytest.mark.parametrize('use_temp_provider', [True, False]) + def test_get_spark_conf_aws_session(self, use_temp_provider): other_spark_opts = {'spark.driver.memory': '2g', 'spark.executor.memoryOverhead': '1024'} not_allowed_opts = {'spark.executorEnv.PAASTA_SERVICE': 'random-service'} user_spark_opts = { @@ -941,9 +942,13 @@ def test_get_spark_conf_aws_session(self): docker_img=self.docker_image, extra_volumes=self.extra_volumes, aws_creds=aws_creds, + auto_set_temporary_credentials_provider=use_temp_provider, ) - assert self.aws_provider_key in output.keys() - assert 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider' == output[self.aws_provider_key] + if use_temp_provider: + assert self.aws_provider_key in output.keys() + assert 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider' == output[self.aws_provider_key] + else: + assert self.aws_provider_key not in output def test_get_spark_conf_mesos( self,