Skip to content

Commit

Permalink
Create pod modifier for computing integrations (Spark, HTCondor)
Browse files Browse the repository at this point in the history
Refactor the existing pod modifier hook for Spark to also do the
necessary configuration for HTCondor. The motivation to cluster
both together is the fact that both computing services require
port opening. Thus, one single k8s service with the necessary open
ports can serve both.

The choice of 18 ports for Spark is kept, and a new choice of 5
ports for HTCondor is introduced, which in practice means that 5
SwanHTCondorClusters can be created -- this should be enough.
  • Loading branch information
etejedor committed Nov 14, 2023
1 parent 9d45291 commit ba75472
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,40 @@
"""


class SwanSparkPodHookHandler(SwanPodHookHandlerProd):
class SwanComputingPodHookHandler(SwanPodHookHandlerProd):

_SPARK_REQUIRED_PORTS = 18
_CONDOR_REQUIRED_PORTS = 5

async def get_swan_user_pod(self):
await super().get_swan_user_pod()

# get hadoop token
hadoop_secret_name = None
required_ports = 0

if self._spark_enabled():
# cern customisation for spark clusters
# Configure Spark clusters at CERN
hadoop_secret_name = await self._init_hadoop_secret()
await self._init_spark(self.pod.metadata.labels)
await self._init_spark()

# Modify user containers (notebook and side-container)
self._modify_containers_for_spark(hadoop_secret_name)

# Add required Spark ports
required_ports += self._SPARK_REQUIRED_PORTS

if self._condor_enabled():
# Configure HTCondor pool at CERN
required_ports += self._CONDOR_REQUIRED_PORTS

# init user containers (notebook and side-container)
self._init_spark_containers(hadoop_secret_name)
if required_ports > 0:
await self._open_ports(required_ports)

return self.pod

async def _init_hadoop_secret(self):
"""
Create secret for Spark/Hadoop
"""

cluster = self.spawner.user_options[self.spawner.spark_cluster_field]

Expand Down Expand Up @@ -81,7 +97,7 @@ async def _init_hadoop_secret(self):
# if no access, all good for now
raise ValueError("Could not get webhdfs tokens")

# Create V1Secret with eos token
# Create V1Secret with webdhfs token and k8s user config
try:
secret_data = V1Secret()

Expand All @@ -108,66 +124,72 @@ async def _init_hadoop_secret(self):

return hadoop_secret_name

def _init_spark_containers(self, hadoop_secret_name):
def _modify_containers_for_spark(self, hadoop_secret_name):
"""
Define cern related secrets for spark and eos
Configure CERN-related secrets for Spark
"""

notebook_container = self._get_pod_container('notebook')
side_container = self._get_pod_container('side-container')

if hadoop_secret_name:
# pod volume to mount generated hadoop tokens and
# side-container volume mount with generated tokens
self.pod.spec.volumes.append(
# V1Secret for tokens without adjusted permissions
V1Volume(
name=hadoop_secret_name,
secret=V1SecretVolumeSource(
secret_name=hadoop_secret_name,
)
# pod volume to mount generated hadoop tokens and
# side-container volume mount with generated tokens
self.pod.spec.volumes.append(
# V1Secret for tokens without adjusted permissions
V1Volume(
name=hadoop_secret_name,
secret=V1SecretVolumeSource(
secret_name=hadoop_secret_name,
)
)
side_container.volume_mounts.append(
V1VolumeMount(
name=hadoop_secret_name,
mount_path='/srv/side-container/hadoop'
)
)
side_container.volume_mounts.append(
V1VolumeMount(
name=hadoop_secret_name,
mount_path='/srv/side-container/hadoop'
)
)

# instruct sparkconnector to fetch delegation tokens from service
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='SWAN_FETCH_HADOOP_TOKENS',
value='true'
),
)
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='SWAN_HADOOP_TOKEN_GENERATOR_URL',
value='http://hadoop-token-generator:80'
),
)
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='KUBECONFIG',
value='/srv/notebook/tokens/k8s-user.config'
),
)
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='WEBHDFS_TOKEN',
value_from=V1EnvVarSource(
secret_key_ref=V1SecretKeySelector(
key='webhdfs.toks',
name=hadoop_secret_name
)
# instruct sparkconnector to fetch delegation tokens from service
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='SWAN_FETCH_HADOOP_TOKENS',
value='true'
),
)

# hadoop token generator url
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='SWAN_HADOOP_TOKEN_GENERATOR_URL',
value='http://hadoop-token-generator:80'
),
)

# configuration to access Spark k8s cluster
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='KUBECONFIG',
value='/srv/notebook/tokens/k8s-user.config'
),
)

# webhdfs token
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='WEBHDFS_TOKEN',
value_from=V1EnvVarSource(
secret_key_ref=V1SecretKeySelector(
key='webhdfs.toks',
name=hadoop_secret_name
)
),
)
)
),
)

def _spark_enabled(self):
"""
Expand Down Expand Up @@ -196,10 +218,11 @@ def _spark_enabled(self):
return True
return False

async def _init_spark(self, pod_labels):
async def _init_spark(self):
"""
Set cern related configuration for spark cluster and open ports
Set CERN-related configuration for Spark clusters
"""

notebook_container = self._get_pod_container('notebook')
username = self.spawner.user.name

Expand All @@ -209,8 +232,6 @@ async def _init_spark(self, pod_labels):
if cluster == 'none':
return

spark_ports_service = "spark-ports" + "-" + username

# add basic spark envs

notebook_container.env = self._add_or_replace_by_name(
Expand Down Expand Up @@ -265,46 +286,59 @@ async def _init_spark(self, pod_labels):
)
)

# configure spark ports
try:
spark_ports_env = []
def _condor_enabled(self):
"""
return True if the user has selected an HTCondor pool.
"""
condor_pool = self.spawner.user_options[self.spawner.condor_pool]
return condor_pool != 'none'

# Define some 18 random NodePorts on the cluster for spark using V1Service
async def _open_ports(self, num_ports):
"""
Create a service that opens the necessary ports for the computing integrations
(Spark, HTCondor)
"""

computing_ports_service = "computing-ports" + "-" + self.spawner.user.name
notebook_container = self._get_pod_container('notebook')

try:
# Define `num_ports` random NodePorts on the cluster using V1Service
service_template_ports = []
spark_ports_per_pod = 18
for port_id in range(1, spark_ports_per_pod + 1):
for port_id in range(1, num_ports + 1):
service_template_ports.append(
V1ServicePort(
name="spark-port-" + str(port_id),
name="comp-port-" + str(port_id),
port=port_id
)
)
service_template = V1Service(
api_version="v1",
kind="Service",
metadata=V1ObjectMeta(
name=spark_ports_service
name=computing_ports_service
),
spec=V1ServiceSpec(
selector=pod_labels, # attach this service to the pod with label {spark_pod_label}
selector=self.pod.metadata.labels, # attach this service to the user pod
ports=service_template_ports,
type="NodePort"
)
)

# Create V1Service which allocates random ports for spark in k8s cluster
# Create V1Service which allocates random ports
try:
# use existing if possible
service = await self.spawner.api.read_namespaced_service(spark_ports_service, swan_container_namespace)
service = await self.spawner.api.read_namespaced_service(computing_ports_service, swan_container_namespace)
except ApiException:
# not existing, create
try:
service = await self.spawner.api.create_namespaced_service(swan_container_namespace, service_template)
except ApiException as e:
raise Exception("Could not create service that allocates random ports for Spark in k8s cluster: %s\n" % e)
raise Exception("Could not create service that allocates random ports for computing (Spark, HTCondor) integrations: %s\n" % e)

# Replace the service with allocated nodeports to map nodeport:targetport
# and set these ports for the notebook container
ports = []
for port_id in range(len(service.spec.ports)):
name = service.spec.ports[port_id].name
node_port = service.spec.ports[port_id].node_port
Expand All @@ -315,9 +349,6 @@ async def _init_spark(self, pod_labels):
target_port=node_port
)

# Construct ports env for spark
spark_ports_env.append(str(node_port))

# Open proper ports in the notebook container to map nodeport:targetport
notebook_container.ports = self._add_or_replace_by_name(
notebook_container.ports,
Expand All @@ -329,21 +360,23 @@ async def _init_spark(self, pod_labels):
)
)

await self.spawner.api.replace_namespaced_service(spark_ports_service, swan_container_namespace, service)
ports.append(str(node_port))

await self.spawner.api.replace_namespaced_service(computing_ports_service, swan_container_namespace, service)

# Add ports env for spark
# Add ports env for computing integrations
notebook_container.env = self._add_or_replace_by_name(
notebook_container.env,
V1EnvVar(
name='SPARK_PORTS',
value=','.join(spark_ports_env)
name='COMPUTING_PORTS',
value=','.join(ports)
)
)
except ApiException as e:
raise Exception("Could not create required user ports: %s\n" % e)


def spark_modify_pod_hook(spawner, pod):
def computing_modify_pod_hook(spawner, pod):
"""
:param spawner: Swan Kubernetes Spawner
:type spawner: swanspawner.SwanKubeSpawner
Expand All @@ -353,8 +386,8 @@ def spark_modify_pod_hook(spawner, pod):
:returns: dynamically customized pod specification for user session
:rtype: V1Pod
"""
spark_pod_hook_handler = SwanSparkPodHookHandler(spawner, pod)
return spark_pod_hook_handler.get_swan_user_pod()
computing_pod_hook_handler = SwanComputingPodHookHandler(spawner, pod)
return computing_pod_hook_handler.get_swan_user_pod()


c.SwanKubeSpawner.modify_pod_hook = spark_modify_pod_hook
c.SwanKubeSpawner.modify_pod_hook = computing_modify_pod_hook
2 changes: 1 addition & 1 deletion swan-cern/templates/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ metadata:
data:
options_form_config.json: {{ .Values.optionsform | toJson }}
{{ (.Files.Glob "files/swan_config_cern.py").AsConfig | indent 2 }}
{{ (.Files.Glob "files/swan_spark_config.py").AsConfig | indent 2 }}
{{ (.Files.Glob "files/swan_computing_config.py").AsConfig | indent 2 }}
{{ (.Files.Glob "files/private/side_container_tokens_perm.sh").AsConfig | indent 2 }}
8 changes: 4 additions & 4 deletions swan-cern/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ swan:
mountPath: /usr/local/etc/jupyterhub/jupyterhub_config.d/2_swan_config_cern.py
subPath: swan_config_cern.py
- name: swan-jh-cern
mountPath: /usr/local/etc/jupyterhub/jupyterhub_config.d/3_swan_spark_config.py
subPath: swan_spark_config.py
mountPath: /usr/local/etc/jupyterhub/jupyterhub_config.d/3_swan_computing_config.py
subPath: swan_computing_config.py
- name: swan-secrets
mountPath: /srv/jupyterhub/private/eos.cred
subPath: eos.cred
Expand Down Expand Up @@ -120,8 +120,8 @@ swan:
path: options_form_config.json
- key: swan_config_cern.py
path: swan_config_cern.py
- key: swan_spark_config.py
path: swan_spark_config.py
- key: swan_computing_config.py
path: swan_computing_config.py
- name: swan-secrets
secret:
secretName: swan-cern
Expand Down

0 comments on commit ba75472

Please sign in to comment.