Skip to content

Commit

Permalink
[RayService] Refactor to Rely More on RayService Status in RayService…
Browse files Browse the repository at this point in the history
… E2E Tests (#1928)
  • Loading branch information
Yicheng-Lu-llll authored Feb 21, 2024
1 parent e330c03 commit e616dc4
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 79 deletions.
6 changes: 3 additions & 3 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ray-operator/apis/ray/v1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type RayServiceStatuses struct {
ServiceStatus ServiceStatus `json:"serviceStatus,omitempty"`
// NumServeEndpoints indicates the number of Ray Pods that are actively serving or have been selected by the serve service.
// Ray Pods without a proxy actor or those that are unhealthy will not be counted.
NumServeEndpoints int32 `json:"NumServeEndpoints,omitempty"`
NumServeEndpoints int32 `json:"numServeEndpoints,omitempty"`
// observedGeneration is the most recent generation observed for this RayService. It corresponds to the
// RayService's generation, which is updated on mutation by the API Server.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 33 additions & 71 deletions tests/test_sample_rayservice_yamls.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)

from framework.utils import (
get_custom_object,
start_curl_pod,
logger,
shell_subprocess_run,
Expand All @@ -44,45 +45,26 @@ def wait(self):
"""Wait for RayService to converge
Wait until:
(1) The number of head pods and worker pods are as expected.
(2) All head pods and worker pods are "Running".
(3) Service named "rayservice-sample-serve" presents
(1) serviceStatus is "Running": This means serve applications in RayCluster are ready to serve incoming traffic.
(2) numServeEndpoints > 0: This means the k8s serve service is ready to redirect traffic to the RayCluster.
"""

logger.info("Waiting for pods in ray service to be running...")
k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]

start_time = time.time()
expected_head_pods = get_expected_head_pods(self.custom_resource_object)
expected_worker_pods = get_expected_worker_pods(self.custom_resource_object)
while time.time() - start_time < self.timeout:
headpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector='ray.io/node-type=head')
workerpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector='ray.io/node-type=worker')
serve_services = k8s_v1_api.list_namespaced_service(
namespace = self.namespace, label_selector =
f"ray.io/originated-from-cr-name={self.custom_resource_object['metadata']['name']},"
f"ray.io/originated-from-crd=RayService,"
f"ray.io/serve={self.custom_resource_object['metadata']['name']}-serve")

logger.info(
"Number of head Pods: %d, Number of worker Pods: %d, Number of serve services: %d",
len(headpods.items), len(workerpods.items), len(serve_services.items)
)

if (len(serve_services.items) == 1 and len(headpods.items) == expected_head_pods
and len(workerpods.items) == expected_worker_pods
and check_pod_running(headpods.items) and check_pod_running(workerpods.items)):
while time.time() - start_time < self.timeout:
rayservice = get_custom_object(CONST.RAY_SERVICE_CRD, self.namespace,
self.custom_resource_object["metadata"]["name"])
status = rayservice.get("status", {})
if status.get("serviceStatus") == "Running" and status.get("numServeEndpoints", 0) > 0:
logger.info("--- RayServiceAddCREvent %s seconds ---", time.time() - start_time)
return

time.sleep(1)

logger.info(f"RayServiceAddCREvent wait() failed to converge in {self.timeout}s.")
logger.info(
f"expected_head_pods: {expected_head_pods}, "
f"expected_worker_pods: {expected_worker_pods}"
f"RayServiceAddCREvent wait() failed to converge in {self.timeout}s."
f"expected serviceStatus: Running, got {status.get('serviceStatus')}"
f"expected numServeEndpoints > 0, got {status.get('numServeEndpoints')}"
)
show_cluster_info(self.namespace)
raise TimeoutError(f"RayServiceAddCREvent didn't finish in {self.timeout}s")
Expand All @@ -107,65 +89,48 @@ def __init__(
if query_while_updating:
self.query_rule = CurlServiceRule(queries=query_while_updating)

def get_ray_service_info(self):
k8s_cr_api: client.CustomObjectsApi = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
return k8s_cr_api.get_namespaced_custom_object_status(
group="ray.io",
namespace=self.namespace,
name=self.name,
version="v1",
plural="rayservices",
)

def get_active_ray_cluster_name(self):
rayservice_info = self.get_ray_service_info()
return rayservice_info["status"]["activeServiceStatus"]["rayClusterName"]
rayservice = get_custom_object(CONST.RAY_SERVICE_CRD, self.namespace, self.name)
return rayservice["status"]["activeServiceStatus"]["rayClusterName"]

def exec(self):
"""Update a CR by a `kubectl apply` command."""
self.old_cluster_name = self.get_active_ray_cluster_name()

self.old_cluster_name = self.get_active_ray_cluster_name()
self.start = time.time()
shell_subprocess_run(f"kubectl apply -n {self.namespace} -f {self.filepath}")

def wait_for_status(self, status: str):
def wait_for_service_status(self, service_status: str):
"""Helper function to check for service status."""

while time.time() - self.start < self.timeout:
rayservice_info = self.get_ray_service_info()
if rayservice_info["status"]["serviceStatus"] == status:
break

rayservice = get_custom_object(CONST.RAY_SERVICE_CRD, self.namespace, self.name)
status = rayservice.get("status", {})
if status.get("serviceStatus") == service_status and status.get("numServeEndpoints", 0) > 0:
return
if self.query_rule:
self.query_rule.assert_rule(self.custom_resource_object, self.namespace)

time.sleep(0.1)
else:
raise TimeoutError(
f'Ray service "{self.name}" did not transition to status "{status}" '
f"after {self.timeout}s."
f"RayServiceUpdateCREvent wait() failed to converge in {self.timeout}s."
f"expected serviceStatus: {service_status}, got {status.get('serviceStatus')}"
f"expected numServeEndpoints > 0, got {status.get('numServeEndpoints')}"
)

def wait(self):
"""Wait for deployment to transition -> WaitForServeDeploymentReady -> Running"""

self.wait_for_status("WaitForServeDeploymentReady")
self.wait_for_service_status("WaitForServeDeploymentReady")
logger.info("Ray service transitioned to status WaitForServeDeploymentReady.")
self.wait_for_status("Running")
self.wait_for_service_status("Running")
logger.info("Ray service transitioned to status Running.")

if self.switch_cluster:
current_cluster_name = self.get_active_ray_cluster_name()
assert current_cluster_name != self.old_cluster_name
logger.info(f'Ray service has moved to cluster "{current_cluster_name}"')

# Wait 20 seconds for the serve service to update.
# TODO (Yicheng-Lu-llll): This workaround should be removed after
# refactoring the way of rolling out and redefining service status.
# Currently, changing to 'running' status does not guarantee that
# the serve service will redirect traffic to the new Raycluster.
time.sleep(20)

class RayServiceDeleteCREvent(CREvent):
"""CREvent for RayService deletion"""
def exec(self):
Expand All @@ -174,20 +139,22 @@ def exec(self):

def wait(self):
"""Wait for pods to be deleted"""
k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
custom_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
start_time = time.time()
while time.time() - start_time < self.timeout:
headpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector = 'ray.io/node-type=head')
workerpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector = 'ray.io/node-type=worker')
if (len(headpods.items) == 0 and len(workerpods.items) == 0):
rayservices = custom_api.list_namespaced_custom_object(
group = 'ray.io', version = 'v1', namespace = self.namespace,
plural = 'rayservices')
rayclusters = custom_api.list_namespaced_custom_object(
group = 'ray.io', version = 'v1', namespace = self.namespace,
plural = 'rayclusters')

if (len(rayservices["items"]) == 0 and len(rayclusters["items"]) == 0):
logger.info("--- Cleanup RayService %s seconds ---", time.time() - start_time)
return
time.sleep(1)

logger.info(f"RayServiceDeleteCREvent failed to converge in {self.timeout}s.")
logger.info("expected_head_pods: 0, expected_worker_pods: 0")
show_cluster_info(self.namespace)
raise TimeoutError(f"RayServiceDeleteCREvent didn't finish in {self.timeout}s.")

Expand Down Expand Up @@ -336,11 +303,6 @@ def test_service_autoscaling(self, set_up_cluster):
requests. Worker pods should scale up. Then we set the event in
the second application, releasing all blocked requests. Worker
pods should scale down.
TODO (kevin85421): Currently, we configure the RayService YAML to
ensure each Pod has at least 1 Ray Serve replica. Hence, all Pods
can pass the readiness check and become ready. Without this workaround,
the RayServiceAddCREvent will fail to converge.
"""
dir_path = "ray-operator/config/samples/"
cr_yaml_path = CONST.REPO_ROOT.joinpath(dir_path).joinpath("ray-service.autoscaler.yaml")
Expand Down

0 comments on commit e616dc4

Please sign in to comment.