Skip to content

Commit

Permalink
add functions for creating ray with oauth proxy in front of the dashb…
Browse files Browse the repository at this point in the history
…oard

Signed-off-by: Kevin <[email protected]>
  • Loading branch information
KPostOffice committed Oct 4, 2023
1 parent 996264a commit 06bdd01
Show file tree
Hide file tree
Showing 7 changed files with 499 additions and 87 deletions.
5 changes: 4 additions & 1 deletion src/codeflare_sdk/cluster/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import urllib3
from ..utils.kube_api_helpers import _kube_api_error_handling

from typing import Optional

global api_client
api_client = None
global config_path
Expand Down Expand Up @@ -183,12 +185,13 @@ def config_check() -> str:
raise PermissionError(
"Action not permitted, have you put in correct/up-to-date auth credentials?"
)
api_client = config.new_client_from_config()

if config_path != None and api_client == None:
return config_path


def api_config_handler() -> str:
def api_config_handler() -> Optional[client.ApiClient]:
"""
This function is used to load the api client if the user has logged in
"""
Expand Down
102 changes: 85 additions & 17 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@
from time import sleep
from typing import List, Optional, Tuple, Dict

import openshift as oc
from kubernetes import config
from ray.job_submission import JobSubmissionClient
import urllib3

from .auth import config_check, api_config_handler
from ..utils import pretty_print
from ..utils.generate_yaml import generate_appwrapper
from ..utils.kube_api_helpers import _kube_api_error_handling
from ..utils.openshift_oauth import (
create_openshift_oauth_objects,
delete_openshift_oauth_objects,
download_tls_cert,
)
from .config import ClusterConfiguration
from .model import (
AppWrapper,
Expand All @@ -40,6 +48,8 @@
import os
import requests

from kubernetes import config


class Cluster:
"""
Expand All @@ -61,6 +71,38 @@ def __init__(self, config: ClusterConfiguration):
self.config = config
self.app_wrapper_yaml = self.create_app_wrapper()
self.app_wrapper_name = self.app_wrapper_yaml.split(".")[0]
self._client = None

@property
def _client_headers(self):
return {
"Authorization": api_config_handler().configuration.get_api_key_with_prefix(
"authorization"
)
}

@property
def _client_verify_tls(self):
return not self.config.openshift_oauth

@property
def client(self):
if self._client:
return self._client
if self.config.openshift_oauth:
print(
api_config_handler().configuration.get_api_key_with_prefix(
"authorization"
)
)
self._client = JobSubmissionClient(
self.cluster_dashboard_uri(),
headers=self._client_headers,
verify=self._client_verify_tls,
)
else:
self._client = JobSubmissionClient(self.cluster_dashboard_uri())
return self._client

def evaluate_dispatch_priority(self):
priority_class = self.config.dispatch_priority
Expand Down Expand Up @@ -147,6 +189,7 @@ def create_app_wrapper(self):
image_pull_secrets=image_pull_secrets,
dispatch_priority=dispatch_priority,
priority_val=priority_val,
openshift_oauth=self.config.openshift_oauth,
)

# creates a new cluster with the provided or default spec
Expand All @@ -156,6 +199,11 @@ def up(self):
the MCAD queue.
"""
namespace = self.config.namespace
if self.config.openshift_oauth:
create_openshift_oauth_objects(
cluster_name=self.config.name, namespace=namespace
)

try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
Expand Down Expand Up @@ -190,6 +238,11 @@ def down(self):
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)

if self.config.openshift_oauth:
delete_openshift_oauth_objects(
cluster_name=self.config.name, namespace=namespace
)

def status(
self, print_to_console: bool = True
) -> Tuple[CodeFlareClusterStatus, bool]:
Expand Down Expand Up @@ -258,7 +311,16 @@ def status(
return status, ready

def is_dashboard_ready(self) -> bool:
response = requests.get(self.cluster_dashboard_uri(), timeout=5)
try:
response = requests.get(
self.cluster_dashboard_uri(),
headers=self._client_headers,
timeout=5,
verify=self._client_verify_tls,
)
except requests.exceptions.SSLError:
# SSL exception occurs when oauth ingress has been created but cluster is not up
return False
if response.status_code == 200:
return True
else:
Expand Down Expand Up @@ -330,7 +392,13 @@ def cluster_dashboard_uri(self) -> str:
return _kube_api_error_handling(e)

for route in routes["items"]:
if route["metadata"]["name"] == f"ray-dashboard-{self.config.name}":
if route["metadata"][
"name"
] == f"ray-dashboard-{self.config.name}" or route["metadata"][
"name"
].startswith(
f"{self.config.name}-ingress"
):
protocol = "https" if route["spec"].get("tls") else "http"
return f"{protocol}://{route['spec']['host']}"
return "Dashboard route not available yet, have you run cluster.up()?"
Expand All @@ -339,30 +407,24 @@ def list_jobs(self) -> List:
"""
This method accesses the head ray node in your cluster and lists the running jobs.
"""
dashboard_route = self.cluster_dashboard_uri()
client = JobSubmissionClient(dashboard_route)
return client.list_jobs()
return self.client.list_jobs()

def job_status(self, job_id: str) -> str:
"""
This method accesses the head ray node in your cluster and returns the job status for the provided job id.
"""
dashboard_route = self.cluster_dashboard_uri()
client = JobSubmissionClient(dashboard_route)
return client.get_job_status(job_id)
return self.client.get_job_status(job_id)

def job_logs(self, job_id: str) -> str:
"""
This method accesses the head ray node in your cluster and returns the logs for the provided job id.
"""
dashboard_route = self.cluster_dashboard_uri()
client = JobSubmissionClient(dashboard_route)
return client.get_job_logs(job_id)
return self.client.get_job_logs(job_id)

def torchx_config(
self, working_dir: str = None, requirements: str = None
) -> Dict[str, str]:
dashboard_address = f"{self.cluster_dashboard_uri().lstrip('http://')}"
dashboard_address = urllib3.util.parse_url(self.cluster_dashboard_uri()).host
to_return = {
"cluster_name": self.config.name,
"dashboard_address": dashboard_address,
Expand Down Expand Up @@ -474,8 +536,8 @@ def get_current_namespace(): # pragma: no cover

def get_cluster(cluster_name: str, namespace: str = "default"):
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
config.load_kube_config()
api_instance = client.CustomObjectsApi()
rcs = api_instance.list_namespaced_custom_object(
group="ray.io",
version="v1alpha1",
Expand All @@ -496,7 +558,7 @@ def get_cluster(cluster_name: str, namespace: str = "default"):
# private methods
def _get_ingress_domain():
try:
config_check()
config.load_kube_config()
api_client = client.CustomObjectsApi(api_config_handler())
ingress = api_client.get_cluster_custom_object(
"config.openshift.io", "v1", "ingresses", "cluster"
Expand Down Expand Up @@ -591,7 +653,7 @@ def _get_app_wrappers(


def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
if "status" in rc and "state" in rc["status"]:
if "state" in rc["status"]:
status = RayClusterStatus(rc["status"]["state"].lower())
else:
status = RayClusterStatus.UNKNOWN
Expand All @@ -606,7 +668,13 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
)
ray_route = None
for route in routes["items"]:
if route["metadata"]["name"] == f"ray-dashboard-{rc['metadata']['name']}":
if route["metadata"][
"name"
] == f"ray-dashboard-{rc['metadata']['name']}" or route["metadata"][
"name"
].startswith(
f"{rc['metadata']['name']}-ingress"
):
protocol = "https" if route["spec"].get("tls") else "http"
ray_route = f"{protocol}://{route['spec']['host']}"

Expand Down
1 change: 1 addition & 0 deletions src/codeflare_sdk/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ class ClusterConfiguration:
local_interactive: bool = False
image_pull_secrets: list = field(default_factory=list)
dispatch_priority: str = None
openshift_oauth: bool = False # NOTE: to use the user must have permission to create a RoleBinding for system:auth-delegator
Loading

0 comments on commit 06bdd01

Please sign in to comment.