Skip to content

Commit

Permalink
Allow to install and remove operator via scripts (#1545)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiripetrlik authored Oct 31, 2023
1 parent 5a974fc commit ff66bcb
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 75 deletions.
23 changes: 3 additions & 20 deletions tests/compatibility-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@
# Default Ray version
ray_version = '2.7.0'

# Default docker images
ray_image = 'rayproject/ray:2.7.0'
kuberay_operator_image = 'kuberay/operator:nightly'


class BasicRayTestCase(unittest.TestCase):
"""Test the basic functionalities of RayCluster by executing simple jobs."""
cluster_template = CONST.REPO_ROOT.joinpath("tests/config/ray-cluster.mini.yaml.template")
Expand All @@ -45,11 +40,7 @@ def setUpClass(cls):
"""Create a Kind cluster, a KubeRay operator, and a RayCluster."""
K8S_CLUSTER_MANAGER.cleanup()
K8S_CLUSTER_MANAGER.initialize_cluster()
image_dict = {
CONST.RAY_IMAGE_KEY: ray_image,
CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image
}
operator_manager = OperatorManager(image_dict)
operator_manager = OperatorManager.instance()
operator_manager.prepare_operator()
utils.create_ray_cluster(BasicRayTestCase.cluster_template, ray_version, ray_image)

Expand Down Expand Up @@ -79,11 +70,7 @@ def setUpClass(cls):
raise unittest.SkipTest(f"{CONST.RAY_FT} is not supported")
K8S_CLUSTER_MANAGER.cleanup()
K8S_CLUSTER_MANAGER.initialize_cluster()
image_dict = {
CONST.RAY_IMAGE_KEY: ray_image,
CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image
}
operator_manager = OperatorManager(image_dict)
operator_manager = OperatorManager.instance()
operator_manager.prepare_operator()
utils.create_ray_cluster(RayFTTestCase.cluster_template, ray_version, ray_image)

Expand Down Expand Up @@ -226,11 +213,7 @@ class KubeRayHealthCheckTestCase(unittest.TestCase):
def setUpClass(cls):
K8S_CLUSTER_MANAGER.cleanup()
K8S_CLUSTER_MANAGER.initialize_cluster()
image_dict = {
CONST.RAY_IMAGE_KEY: ray_image,
CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image
}
operator_manager = OperatorManager(image_dict)
operator_manager = OperatorManager.instance()
operator_manager.prepare_operator()
utils.create_ray_cluster(
KubeRayHealthCheckTestCase.cluster_template, ray_version, ray_image)
Expand Down
6 changes: 3 additions & 3 deletions tests/framework/prototype.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def __init__(
self,
custom_resource_object,
rulesets: List[RuleSet] = [],
timeout: int = 90,
timeout: int = 300,
namespace: str = "default",
filepath: Optional[str] = None,
):
Expand Down Expand Up @@ -611,10 +611,10 @@ def clean_up(self):

class GeneralTestCase(unittest.TestCase):
"""TestSuite"""
def __init__(self, methodName, docker_image_dict, cr_event):
def __init__(self, methodName, cr_event):
super().__init__(methodName)
self.cr_event = cr_event
self.operator_manager = OperatorManager(docker_image_dict)
self.operator_manager = OperatorManager.instance()

@classmethod
def setUpClass(cls):
Expand Down
104 changes: 71 additions & 33 deletions tests/framework/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,50 +88,55 @@ def instance(cls):
return KindClusterManager()

class ExternalClusterManager(ClusterManager):
CLUSTER_CLEANUP_SCRIPT = "CLUSTER_CLEANUP_SCRIPT"

def __init__(self) -> None:
config.load_kube_config()
self.k8s_client_dict = {}
self.k8s_client_dict.update(
{
CONST.K8S_V1_CLIENT_KEY: client.CoreV1Api(),
CONST.K8S_CR_CLIENT_KEY: client.CustomObjectsApi(),
}
)
self.cleanup_timeout = 120

def cleanup(self, namespace = "default") -> None:
self.__delete_all_crs("ray.io", "v1", namespace, "rayservices")
self.__delete_all_crs("ray.io", "v1", namespace, "rayjobs")
self.__delete_all_crs("ray.io", "v1", namespace, "rayclusters")

k8s_v1_api = self.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
start_time = time.time()
while time.time() - start_time < self.cleanup_timeout:
pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/created-by=kuberay-operator')
if len(pods.items) == 0:
logger.info("--- Cleanup rayservices, rayjobs, rayclusters %s seconds ---", time.time() - start_time)
break

time.sleep(1)

shell_subprocess_run("helm uninstall kuberay-operator", check=False)
start_time = time.time()
while time.time() - start_time < self.cleanup_timeout:
pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/component=kuberay-operator')
if len(pods.items) == 0:
logger.info("--- Cleanup kuberay-operator %s seconds ---", time.time() - start_time)
break

time.sleep(1)

if self.CLUSTER_CLEANUP_SCRIPT in os.environ:
cleanup_script = os.environ[self.CLUSTER_CLEANUP_SCRIPT]
shell_subprocess_run(cleanup_script)
else:
self.__delete_all_crs("ray.io", "v1", namespace, "rayservices")
self.__delete_all_crs("ray.io", "v1", namespace, "rayjobs")
self.__delete_all_crs("ray.io", "v1", namespace, "rayclusters")

k8s_v1_api = self.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
start_time = time.time()
while time.time() - start_time < self.cleanup_timeout:
pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/created-by=kuberay-operator')
if len(pods.items) == 0:
logger.info("--- Cleanup rayservices, rayjobs, rayclusters %s seconds ---", time.time() - start_time)
break

time.sleep(1)

shell_subprocess_run("helm uninstall kuberay-operator", check=False)
start_time = time.time()
while time.time() - start_time < self.cleanup_timeout:
pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/component=kuberay-operator')
if len(pods.items) == 0:
logger.info("--- Cleanup kuberay-operator %s seconds ---", time.time() - start_time)
break

time.sleep(1)

for _, k8s_client in self.k8s_client_dict.items():
k8s_client.api_client.rest_client.pool_manager.clear()
k8s_client.api_client.close()

self.k8s_client_dict = {}

def initialize_cluster(self, kind_config=None) -> None:
pass
config.load_kube_config()
self.k8s_client_dict.update(
{
CONST.K8S_V1_CLIENT_KEY: client.CoreV1Api(),
CONST.K8S_CR_CLIENT_KEY: client.CustomObjectsApi(),
}
)

def upload_image(self, image):
pass
Expand Down Expand Up @@ -209,7 +214,31 @@ def _adjust_kubeconfig_server_address(self) -> None:
K8S_CLUSTER_MANAGER = ClusterManager.instance()


class OperatorManager:
class OperatorManager(ABC):
KUBERAY_OPERATOR_INSTALLATION_SCRIPT = "KUBERAY_OPERATOR_INSTALLATION_SCRIPT"

@abstractmethod
def prepare_operator(self):
pass

@classmethod
def instance(cls, namespace=None, patch=jsonpatch.JsonPatch([]),
cluster_manager = K8S_CLUSTER_MANAGER):
if cls.KUBERAY_OPERATOR_INSTALLATION_SCRIPT in os.environ:
if (namespace != None) or (patch != jsonpatch.JsonPatch([])):
raise Exception("Parameters namespace or patch are not supported in ScriptBasedOperatorManager")
return ScriptBasedOperatorManager()
else:
if namespace == None:
namespace = "default"
DEFAULT_IMAGE_DICT = {
CONST.RAY_IMAGE_KEY: os.getenv('RAY_IMAGE', default='rayproject/ray:2.7.0'),
CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'),
}
default_operator_manager = DefaultOperatorManager(DEFAULT_IMAGE_DICT, namespace, patch, cluster_manager)
return default_operator_manager

class DefaultOperatorManager(OperatorManager):
"""
OperatorManager controlls the lifecycle of KubeRay operator. It will download Docker images,
load images into an existing KinD cluster, and install CRD and KubeRay operator.
Expand Down Expand Up @@ -306,6 +335,15 @@ def __install_crd_and_operator(self):
f"--set image.repository={repo},image.tag={tag}"
)

class ScriptBasedOperatorManager(OperatorManager):
def __init__(self):
self.installation_script = os.getenv(self.KUBERAY_OPERATOR_INSTALLATION_SCRIPT)

def prepare_operator(self):
return_code = shell_subprocess_run(self.installation_script)
if return_code != 0:
raise Exception("Operator installation failed with exit code " + str(return_code))


def shell_subprocess_run(command, check=True, hide_output=False) -> int:
"""Command will be executed through the shell.
Expand Down
8 changes: 2 additions & 6 deletions tests/test_sample_raycluster_yamls.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,7 @@
}

rs = RuleSet([HeadPodNameRule(), EasyJobRule(), HeadSvcRule()])
image_dict = {
CONST.RAY_IMAGE_KEY: os.getenv('RAY_IMAGE', default='rayproject/ray:2.7.0'),
CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'),
}
logger.info(image_dict)

# Build a test plan
logger.info("Build a test plan ...")
test_cases = unittest.TestSuite()
Expand All @@ -69,7 +65,7 @@
continue
logger.info('[TEST %d]: %s', index, new_cr['name'])
addEvent = RayClusterAddCREvent(new_cr['cr'], [rs], 90, NAMESPACE, new_cr['path'])
test_cases.addTest(GeneralTestCase('runtest', image_dict, addEvent))
test_cases.addTest(GeneralTestCase('runtest', addEvent))

# Execute all tests
runner = unittest.TextTestRunner()
Expand Down
7 changes: 1 addition & 6 deletions tests/test_sample_rayjob_yamls.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,14 @@
# (The event is not considered "converged" until the job has succeeded.) The EasyJobRule
# is only used to additionally check that the Ray Cluster remains alive and functional.
rs = RuleSet([EasyJobRule(), ShutdownJobRule()])
image_dict = {
CONST.RAY_IMAGE_KEY: os.getenv('RAY_IMAGE', default='rayproject/ray:2.7.0'),
CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'),
}
logger.info(image_dict)

# Build a test plan
logger.info("Building a test plan ...")
test_cases = unittest.TestSuite()
for index, new_cr in enumerate(sample_yaml_files):
logger.info('[TEST %d]: %s', index, new_cr['name'])
addEvent = RayJobAddCREvent(new_cr['cr'], [rs], 300, NAMESPACE, new_cr['path'])
test_cases.addTest(GeneralTestCase('runtest', image_dict, addEvent))
test_cases.addTest(GeneralTestCase('runtest', addEvent))

# Execute all testsCRs
runner = unittest.TextTestRunner()
Expand Down
7 changes: 1 addition & 6 deletions tests/test_sample_rayservice_yamls.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
logger = logging.getLogger(__name__)

NAMESPACE = 'default'
DEFAULT_IMAGE_DICT = {
CONST.RAY_IMAGE_KEY: os.getenv('RAY_IMAGE', default='rayproject/ray:2.7.0'),
CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'),
}

class RayServiceAddCREvent(CREvent):
"""CREvent for RayService addition"""
Expand Down Expand Up @@ -199,10 +195,9 @@ def set_up_cluster(self):

K8S_CLUSTER_MANAGER.cleanup()
K8S_CLUSTER_MANAGER.initialize_cluster()
operator_manager = OperatorManager(DEFAULT_IMAGE_DICT)
operator_manager = OperatorManager.instance()
operator_manager.prepare_operator()
start_curl_pod("curl", "default")
logger.info(DEFAULT_IMAGE_DICT)

yield

Expand Down
2 changes: 1 addition & 1 deletion tests/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def setUpClass(cls):
'seccompProfile': {'type': 'RuntimeDefault'}
}
}])
operator_manager = OperatorManager(image_dict, PodSecurityTestCase.namespace, patch)
operator_manager = OperatorManager.instance(PodSecurityTestCase.namespace, patch)
operator_manager.prepare_operator()

def test_ray_cluster_with_security_context(self):
Expand Down

0 comments on commit ff66bcb

Please sign in to comment.