Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for eks cluster not matching --cluster spark arg #3636

Merged
merged 2 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions paasta_tools/cli/cmds/spark_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1102,8 +1102,28 @@ def _validate_pool(args, system_paasta_config):


def _get_k8s_url_for_cluster(cluster: str) -> Optional[str]:
"""
Annoyingly, there's two layers of aliases: one to figure out what
k8s server url to use (this one) and another to figure out what
soaconfigs filename to use ;_;

This exists so that we can map something like `--cluster pnw-devc`
into spark-pnw-devc's k8s apiserver url without needing to update
any soaconfigs/alter folk's muscle memory.

Ideally we can get rid of this entirely once spark-run reads soaconfigs
in a manner more cloesly aligned to what we do with other paasta workloads
nemacysts marked this conversation as resolved.
Show resolved Hide resolved
(i.e., have it automatically determine where to run based on soaconfigs
filenames - and not rely on explicit config)
"""
realized_cluster = (
load_system_paasta_config().get_eks_cluster_aliases().get(cluster, cluster)
)
return (
load_system_paasta_config().get_kube_clusters().get(cluster, {}).get("server")
load_system_paasta_config()
.get_kube_clusters()
.get(realized_cluster, {})
.get("server")
)


Expand Down Expand Up @@ -1138,14 +1158,16 @@ def paasta_spark_run(args):
if not _validate_pool(args, system_paasta_config):
return 1

# annoyingly, there's two layers of aliases: one for the soaconfigs to read from
# (that's this alias lookup) - and then another layer later when figuring out what
# k8s server url to use ;_;
cluster = system_paasta_config.get_cluster_aliases().get(args.cluster, args.cluster)
# Use the default spark:client instance configs if not provided
try:
instance_config = get_instance_config(
service=args.service,
instance=args.instance,
cluster=system_paasta_config.get_cluster_aliases().get(
args.cluster, args.cluster
),
cluster=cluster,
load_deployments=args.build is False and args.image is None,
soa_dir=args.yelpsoa_config_root,
)
Expand Down Expand Up @@ -1232,12 +1254,19 @@ def paasta_spark_run(args):

use_eks = decide_final_eks_toggle_state(args.use_eks_override)
k8s_server_address = _get_k8s_url_for_cluster(args.cluster) if use_eks else None
paasta_cluster = (
args.cluster
if not use_eks
else load_system_paasta_config()
.get_eks_cluster_aliases()
.get(args.cluster, args.cluster)
)
spark_conf = get_spark_conf(
cluster_manager=args.cluster_manager,
spark_app_base_name=app_base_name,
docker_img=docker_image_digest,
user_spark_opts=user_spark_opts,
paasta_cluster=args.cluster,
paasta_cluster=paasta_cluster,
paasta_pool=args.pool,
paasta_service=args.service,
paasta_instance=paasta_instance,
Expand Down
4 changes: 4 additions & 0 deletions paasta_tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,7 @@ class SystemPaastaConfigDict(TypedDict, total=False):
spark_kubeconfig: str
kube_clusters: Dict
spark_use_eks_default: bool
eks_cluster_aliases: Dict[str, str]


def load_system_paasta_config(
Expand Down Expand Up @@ -2720,6 +2721,9 @@ def get_skip_cpu_burst_validation_services(self) -> List[str]:
def get_cluster_aliases(self) -> Dict[str, str]:
return self.config_dict.get("cluster_aliases", {})

def get_eks_cluster_aliases(self) -> Dict[str, str]:
return self.config_dict.get("eks_cluster_aliases", {})

def get_cluster_pools(self) -> Dict[str, List[str]]:
return self.config_dict.get("allowed_pools", {})

Expand Down