Skip to content

Commit

Permalink
Add auto_set_temporary_credentials_provider to get_spark_conf for spa…
Browse files Browse the repository at this point in the history
…rk 3.2.0 support. (#80)
  • Loading branch information
jsleight authored Jun 23, 2022
1 parent acfe14d commit 0fa1ddb
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
10 changes: 6 additions & 4 deletions service_configuration_lib/spark_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ def get_spark_conf(
mesos_leader: Optional[str] = None,
spark_opts_from_env: Optional[Mapping[str, str]] = None,
load_paasta_default_volumes: bool = False,
auto_set_temporary_credentials_provider: bool = True,
) -> Dict[str, str]:
"""Build spark config dict to run with spark on paasta
Expand Down Expand Up @@ -673,6 +674,10 @@ def get_spark_conf(
spark session.
:param load_paasta_default_volumes: whether to include default paasta mounted volumes
into the spark executors.
:param auto_set_temporary_credentials_provider: whether to set the temporary credentials
provider if the session token exists. In hadoop-aws 3.2.1 this is needed, but
in hadoop-aws 3.3.1 the temporary credentials provider is the new default and
causes errors if explicitly set for unknown reasons.
:returns: spark opts in a dict.
"""
# for simplicity, all the following computation are assuming spark opts values
Expand All @@ -697,10 +702,7 @@ def get_spark_conf(

spark_conf = {**(spark_opts_from_env or {}), **_filter_user_spark_opts(user_spark_opts)}

# We automatically update the credentials provider if the session token is included.
# By default the SimpleAWSCredentials provider is used, which is incompatible with
# temporary credentials. More details in SEC-13906.
if aws_creds[2] is not None:
if aws_creds[2] is not None and auto_set_temporary_credentials_provider:
spark_conf['spark.hadoop.fs.s3a.aws.credentials.provider'] = AWS_TEMP_CREDENTIALS_PROVIDER

spark_conf.update({
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

setup(
name='service-configuration-lib',
version='2.10.5',
version='2.10.6',
provides=['service_configuration_lib'],
description='Start, stop, and inspect Yelp SOA services',
url='https://github.com/Yelp/service_configuration_lib',
Expand Down
11 changes: 8 additions & 3 deletions tests/spark_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,8 @@ def verify(output):

return verify

def test_get_spark_conf_aws_session(self):
@pytest.mark.parametrize('use_temp_provider', [True, False])
def test_get_spark_conf_aws_session(self, use_temp_provider):
other_spark_opts = {'spark.driver.memory': '2g', 'spark.executor.memoryOverhead': '1024'}
not_allowed_opts = {'spark.executorEnv.PAASTA_SERVICE': 'random-service'}
user_spark_opts = {
Expand All @@ -941,9 +942,13 @@ def test_get_spark_conf_aws_session(self):
docker_img=self.docker_image,
extra_volumes=self.extra_volumes,
aws_creds=aws_creds,
auto_set_temporary_credentials_provider=use_temp_provider,
)
assert self.aws_provider_key in output.keys()
assert 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider' == output[self.aws_provider_key]
if use_temp_provider:
assert self.aws_provider_key in output.keys()
assert 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider' == output[self.aws_provider_key]
else:
assert self.aws_provider_key not in output

def test_get_spark_conf_mesos(
self,
Expand Down

0 comments on commit 0fa1ddb

Please sign in to comment.