diff --git a/swan-cern/files/swan_spark_config.py b/swan-cern/files/swan_computing_config.py similarity index 67% rename from swan-cern/files/swan_spark_config.py rename to swan-cern/files/swan_computing_config.py index 12a028a1..c3e41561 100644 --- a/swan-cern/files/swan_spark_config.py +++ b/swan-cern/files/swan_computing_config.py @@ -22,24 +22,40 @@ """ -class SwanSparkPodHookHandler(SwanPodHookHandlerProd): +class SwanComputingPodHookHandler(SwanPodHookHandlerProd): + + _SPARK_REQUIRED_PORTS = 18 + _CONDOR_REQUIRED_PORTS = 5 async def get_swan_user_pod(self): await super().get_swan_user_pod() - # get hadoop token - hadoop_secret_name = None + required_ports = 0 + if self._spark_enabled(): - # cern customisation for spark clusters + # Configure Spark clusters at CERN hadoop_secret_name = await self._init_hadoop_secret() - await self._init_spark(self.pod.metadata.labels) + await self._init_spark() + + # Modify user containers (notebook and side-container) + self._modify_containers_for_spark(hadoop_secret_name) + + # Add required Spark ports + required_ports += self._SPARK_REQUIRED_PORTS + + if self._condor_enabled(): + # Configure HTCondor pool at CERN + required_ports += self._CONDOR_REQUIRED_PORTS - # init user containers (notebook and side-container) - self._init_spark_containers(hadoop_secret_name) + if required_ports > 0: + await self._open_ports(required_ports) return self.pod async def _init_hadoop_secret(self): + """ + Create secret for Spark/Hadoop + """ cluster = self.spawner.user_options[self.spawner.spark_cluster_field] @@ -81,7 +97,7 @@ async def _init_hadoop_secret(self): # if no access, all good for now raise ValueError("Could not get webhdfs tokens") - # Create V1Secret with eos token + # Create V1Secret with webdhfs token and k8s user config try: secret_data = V1Secret() @@ -108,66 +124,72 @@ async def _init_hadoop_secret(self): return hadoop_secret_name - def _init_spark_containers(self, hadoop_secret_name): + def _modify_containers_for_spark(self, hadoop_secret_name): """ - Define cern related secrets for spark and eos + Configure CERN-related secrets for Spark """ + notebook_container = self._get_pod_container('notebook') side_container = self._get_pod_container('side-container') - if hadoop_secret_name: - # pod volume to mount generated hadoop tokens and - # side-container volume mount with generated tokens - self.pod.spec.volumes.append( - # V1Secret for tokens without adjusted permissions - V1Volume( - name=hadoop_secret_name, - secret=V1SecretVolumeSource( - secret_name=hadoop_secret_name, - ) + # pod volume to mount generated hadoop tokens and + # side-container volume mount with generated tokens + self.pod.spec.volumes.append( + # V1Secret for tokens without adjusted permissions + V1Volume( + name=hadoop_secret_name, + secret=V1SecretVolumeSource( + secret_name=hadoop_secret_name, ) ) - side_container.volume_mounts.append( - V1VolumeMount( - name=hadoop_secret_name, - mount_path='/srv/side-container/hadoop' - ) + ) + side_container.volume_mounts.append( + V1VolumeMount( + name=hadoop_secret_name, + mount_path='/srv/side-container/hadoop' ) + ) - # instruct sparkconnector to fetch delegation tokens from service - notebook_container.env = self._add_or_replace_by_name( - notebook_container.env, - V1EnvVar( - name='SWAN_FETCH_HADOOP_TOKENS', - value='true' - ), - ) - notebook_container.env = self._add_or_replace_by_name( - notebook_container.env, - V1EnvVar( - name='SWAN_HADOOP_TOKEN_GENERATOR_URL', - value='http://hadoop-token-generator:80' - ), - ) - notebook_container.env = self._add_or_replace_by_name( - notebook_container.env, - V1EnvVar( - name='KUBECONFIG', - value='/srv/notebook/tokens/k8s-user.config' - ), - ) - notebook_container.env = self._add_or_replace_by_name( - notebook_container.env, - V1EnvVar( - name='WEBHDFS_TOKEN', - value_from=V1EnvVarSource( - secret_key_ref=V1SecretKeySelector( - key='webhdfs.toks', - name=hadoop_secret_name - ) + # instruct sparkconnector to fetch delegation tokens from service + notebook_container.env = self._add_or_replace_by_name( + notebook_container.env, + V1EnvVar( + name='SWAN_FETCH_HADOOP_TOKENS', + value='true' + ), + ) + + # hadoop token generator url + notebook_container.env = self._add_or_replace_by_name( + notebook_container.env, + V1EnvVar( + name='SWAN_HADOOP_TOKEN_GENERATOR_URL', + value='http://hadoop-token-generator:80' + ), + ) + + # configuration to access Spark k8s cluster + notebook_container.env = self._add_or_replace_by_name( + notebook_container.env, + V1EnvVar( + name='KUBECONFIG', + value='/srv/notebook/tokens/k8s-user.config' + ), + ) + + # webhdfs token + notebook_container.env = self._add_or_replace_by_name( + notebook_container.env, + V1EnvVar( + name='WEBHDFS_TOKEN', + value_from=V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + key='webhdfs.toks', + name=hadoop_secret_name ) - ), - ) + ) + ), + ) def _spark_enabled(self): """ @@ -196,10 +218,11 @@ def _spark_enabled(self): return True return False - async def _init_spark(self, pod_labels): + async def _init_spark(self): """ - Set cern related configuration for spark cluster and open ports + Set CERN-related configuration for Spark clusters """ + notebook_container = self._get_pod_container('notebook') username = self.spawner.user.name @@ -209,8 +232,6 @@ async def _init_spark(self, pod_labels): if cluster == 'none': return - spark_ports_service = "spark-ports" + "-" + username - # add basic spark envs notebook_container.env = self._add_or_replace_by_name( @@ -265,17 +286,29 @@ async def _init_spark(self, pod_labels): ) ) - # configure spark ports - try: - spark_ports_env = [] + def _condor_enabled(self): + """ + return True if the user has selected an HTCondor pool. + """ + condor_pool = self.spawner.user_options[self.spawner.condor_pool] + return condor_pool != 'none' - # Define some 18 random NodePorts on the cluster for spark using V1Service + async def _open_ports(self, num_ports): + """ + Create a service that opens the necessary ports for the computing integrations + (Spark, HTCondor) + """ + + computing_ports_service = "computing-ports" + "-" + self.spawner.user.name + notebook_container = self._get_pod_container('notebook') + + try: + # Define `num_ports` random NodePorts on the cluster using V1Service service_template_ports = [] - spark_ports_per_pod = 18 - for port_id in range(1, spark_ports_per_pod + 1): + for port_id in range(1, num_ports + 1): service_template_ports.append( V1ServicePort( - name="spark-port-" + str(port_id), + name="comp-port-" + str(port_id), port=port_id ) ) @@ -283,28 +316,29 @@ async def _init_spark(self, pod_labels): api_version="v1", kind="Service", metadata=V1ObjectMeta( - name=spark_ports_service + name=computing_ports_service ), spec=V1ServiceSpec( - selector=pod_labels, # attach this service to the pod with label {spark_pod_label} + selector=self.pod.metadata.labels, # attach this service to the user pod ports=service_template_ports, type="NodePort" ) ) - # Create V1Service which allocates random ports for spark in k8s cluster + # Create V1Service which allocates random ports try: # use existing if possible - service = await self.spawner.api.read_namespaced_service(spark_ports_service, swan_container_namespace) + service = await self.spawner.api.read_namespaced_service(computing_ports_service, swan_container_namespace) except ApiException: # not existing, create try: service = await self.spawner.api.create_namespaced_service(swan_container_namespace, service_template) except ApiException as e: - raise Exception("Could not create service that allocates random ports for Spark in k8s cluster: %s\n" % e) + raise Exception("Could not create service that allocates random ports for computing (Spark, HTCondor) integrations: %s\n" % e) # Replace the service with allocated nodeports to map nodeport:targetport # and set these ports for the notebook container + ports = [] for port_id in range(len(service.spec.ports)): name = service.spec.ports[port_id].name node_port = service.spec.ports[port_id].node_port @@ -315,9 +349,6 @@ async def _init_spark(self, pod_labels): target_port=node_port ) - # Construct ports env for spark - spark_ports_env.append(str(node_port)) - # Open proper ports in the notebook container to map nodeport:targetport notebook_container.ports = self._add_or_replace_by_name( notebook_container.ports, @@ -329,21 +360,23 @@ async def _init_spark(self, pod_labels): ) ) - await self.spawner.api.replace_namespaced_service(spark_ports_service, swan_container_namespace, service) + ports.append(str(node_port)) + + await self.spawner.api.replace_namespaced_service(computing_ports_service, swan_container_namespace, service) - # Add ports env for spark + # Add ports env for computing integrations notebook_container.env = self._add_or_replace_by_name( notebook_container.env, V1EnvVar( - name='SPARK_PORTS', - value=','.join(spark_ports_env) + name='COMPUTING_PORTS', + value=','.join(ports) ) ) except ApiException as e: raise Exception("Could not create required user ports: %s\n" % e) -def spark_modify_pod_hook(spawner, pod): +def computing_modify_pod_hook(spawner, pod): """ :param spawner: Swan Kubernetes Spawner :type spawner: swanspawner.SwanKubeSpawner @@ -353,8 +386,8 @@ def spark_modify_pod_hook(spawner, pod): :returns: dynamically customized pod specification for user session :rtype: V1Pod """ - spark_pod_hook_handler = SwanSparkPodHookHandler(spawner, pod) - return spark_pod_hook_handler.get_swan_user_pod() + computing_pod_hook_handler = SwanComputingPodHookHandler(spawner, pod) + return computing_pod_hook_handler.get_swan_user_pod() -c.SwanKubeSpawner.modify_pod_hook = spark_modify_pod_hook +c.SwanKubeSpawner.modify_pod_hook = computing_modify_pod_hook diff --git a/swan-cern/templates/config.yaml b/swan-cern/templates/config.yaml index 376c4195..ceb20ffa 100644 --- a/swan-cern/templates/config.yaml +++ b/swan-cern/templates/config.yaml @@ -6,5 +6,5 @@ metadata: data: options_form_config.json: {{ .Values.optionsform | toJson }} {{ (.Files.Glob "files/swan_config_cern.py").AsConfig | indent 2 }} -{{ (.Files.Glob "files/swan_spark_config.py").AsConfig | indent 2 }} +{{ (.Files.Glob "files/swan_computing_config.py").AsConfig | indent 2 }} {{ (.Files.Glob "files/private/side_container_tokens_perm.sh").AsConfig | indent 2 }} diff --git a/swan-cern/values.yaml b/swan-cern/values.yaml index 6c563025..5f2c8e8c 100644 --- a/swan-cern/values.yaml +++ b/swan-cern/values.yaml @@ -88,8 +88,8 @@ swan: mountPath: /usr/local/etc/jupyterhub/jupyterhub_config.d/2_swan_config_cern.py subPath: swan_config_cern.py - name: swan-jh-cern - mountPath: /usr/local/etc/jupyterhub/jupyterhub_config.d/3_swan_spark_config.py - subPath: swan_spark_config.py + mountPath: /usr/local/etc/jupyterhub/jupyterhub_config.d/3_swan_computing_config.py + subPath: swan_computing_config.py - name: swan-secrets mountPath: /srv/jupyterhub/private/eos.cred subPath: eos.cred @@ -120,8 +120,8 @@ swan: path: options_form_config.json - key: swan_config_cern.py path: swan_config_cern.py - - key: swan_spark_config.py - path: swan_spark_config.py + - key: swan_computing_config.py + path: swan_computing_config.py - name: swan-secrets secret: secretName: swan-cern