Skip to content

Commit

Permalink
Pick spark ui port with preference and add user pod label (#122)
Browse files Browse the repository at this point in the history
* Pick spark ui port with preference and add user pod label

* Fix tests

* Update the default user value

* Update service_configuration_lib/spark_config.py

Co-authored-by: Luis Pérez <[email protected]>

---------

Co-authored-by: Luis Pérez <[email protected]>
  • Loading branch information
edingroot and nemacysts authored Aug 11, 2023
1 parent 87624ac commit 39a983c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
11 changes: 6 additions & 5 deletions service_configuration_lib/spark_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
'spark.kubernetes.executor.label.paasta.yelp.com/service',
'spark.kubernetes.executor.label.paasta.yelp.com/instance',
'spark.kubernetes.executor.label.paasta.yelp.com/cluster',
'spark.kubernetes.executor.label.spark.yelp.com/user',
}

K8S_AUTH_FOLDER = '/etc/pki/spark'
Expand All @@ -74,6 +75,7 @@

SUPPORTED_CLUSTER_MANAGERS = ['kubernetes', 'local']
DEFAULT_SPARK_RUN_CONFIG = '/nail/srv/configs/spark.yaml'
PREFERRED_SPARK_UI_PORT = 39091

log = logging.Logger(__name__)
log.setLevel(logging.INFO)
Expand Down Expand Up @@ -172,11 +174,8 @@ def assume_aws_role(
return resp['Credentials']


def _pick_random_port(app_name):
def _pick_random_port(preferred_port: int = 0) -> int:
"""Return a random port. """
hash_key = f'{app_name}_{time.time()}'.encode('utf-8')
hash_number = int(hashlib.sha1(hash_key).hexdigest(), 16)
preferred_port = 33000 + (hash_number % 25000)
return ephemeral_port_reserve.reserve('0.0.0.0', preferred_port)


Expand Down Expand Up @@ -276,6 +275,7 @@ def _get_k8s_spark_env(
_paasta_cluster = _get_k8s_resource_name_limit_size_with_hash(paasta_cluster)
_paasta_service = _get_k8s_resource_name_limit_size_with_hash(paasta_service)
_paasta_instance = _get_k8s_resource_name_limit_size_with_hash(paasta_instance)
user = os.environ.get('USER', '_unspecified_')

spark_env = {
'spark.master': f'k8s://https://k8s.{paasta_cluster}.paasta:6443',
Expand All @@ -293,6 +293,7 @@ def _get_k8s_spark_env(
'spark.kubernetes.executor.label.paasta.yelp.com/service': _paasta_service,
'spark.kubernetes.executor.label.paasta.yelp.com/instance': _paasta_instance,
'spark.kubernetes.executor.label.paasta.yelp.com/cluster': _paasta_cluster,
'spark.kubernetes.executor.label.spark.yelp.com/user': user,
'spark.kubernetes.node.selector.yelp.com/pool': paasta_pool,
'spark.kubernetes.executor.label.yelp.com/pool': paasta_pool,
'spark.kubernetes.executor.label.paasta.yelp.com/pool': paasta_pool,
Expand Down Expand Up @@ -1062,7 +1063,7 @@ def get_spark_conf(
)

ui_port = (spark_opts_from_env or {}).get('spark.ui.port') or _pick_random_port(
app_base_name + str(time.time()),
PREFERRED_SPARK_UI_PORT,
)

# app_name from env is already appended port and time to make it unique
Expand Down
6 changes: 5 additions & 1 deletion tests/spark_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


TEST_ACCOUNT_ID = '123456789'
TEST_USER = 'UNIT_TEST_USER'


@pytest.fixture
Expand Down Expand Up @@ -130,7 +131,8 @@ def test_fail(self, tmpdir):

def test_pick_random_port():
with mock.patch('ephemeral_port_reserve.reserve') as mock_reserve:
port = spark_config._pick_random_port('test')
preferred_port = 33123 # Any ephemeral port for testing
port = spark_config._pick_random_port(preferred_port)
(host, prefer_port), _ = mock_reserve.call_args
assert host == '0.0.0.0'
assert prefer_port >= 33000
Expand Down Expand Up @@ -1188,6 +1190,7 @@ def assert_kubernetes_conf(self, base_volumes):
'spark.kubernetes.executor.label.paasta.yelp.com/service': self.service,
'spark.kubernetes.executor.label.paasta.yelp.com/instance': self.instance,
'spark.kubernetes.executor.label.paasta.yelp.com/cluster': self.cluster,
'spark.kubernetes.executor.label.spark.yelp.com/user': TEST_USER,
'spark.kubernetes.node.selector.yelp.com/pool': self.pool,
'spark.kubernetes.executor.label.yelp.com/pool': self.pool,
'spark.kubernetes.executor.label.paasta.yelp.com/pool': self.pool,
Expand All @@ -1206,6 +1209,7 @@ def verify(output):
return list(expected_output.keys())
return verify

@mock.patch.dict(os.environ, {'USER': TEST_USER})
def test_leaders_get_spark_conf_kubernetes(
self,
user_spark_opts,
Expand Down

0 comments on commit 39a983c

Please sign in to comment.