diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index 2771786d07a3..9e63f232e81c 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -128,6 +128,7 @@ def _health_check(self): self._health_check_obj_ref = None try: ray.get(finished[0]) + self.try_update_status(HTTPProxyStatus.HEALTHY) except ray.exceptions.RayActorError: # The proxy actor dies. self.set_status(HTTPProxyStatus.UNHEALTHY) diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index 92e119e62469..57b00fef5ccf 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -13,7 +13,11 @@ from ray.serve._private.common import HTTPProxyStatus from ray.serve._private.http_state import HTTPProxyStateManager, HTTPProxyState from ray.serve._private.http_proxy import HTTPProxyActor -from ray.serve._private.constants import SERVE_CONTROLLER_NAME, SERVE_NAMESPACE +from ray.serve._private.constants import ( + SERVE_CONTROLLER_NAME, + SERVE_NAMESPACE, + PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD, +) from ray.serve.controller import ServeController from ray.serve._private.utils import get_head_node_id from ray.serve._private.default_impl import ( @@ -521,6 +525,118 @@ async def check_health(self): assert proxy_state._consecutive_health_check_failures == 3 +@patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) +def test_http_proxy_state_update_healthy_check_health_sometimes_fails(): + """Test that the proxy is UNHEALTHY after consecutive health-check failures. + + The proxy state starts with STARTING. Then the proxy fails a few times + (less than the threshold needed to set it UNHEALTHY). Then it succeeds, so + it becomes HEALTHY. Then it fails a few times again but stays HEALTHY + because the failures weren't consecutive with the previous ones. And then + it finally fails enough times to become UNHEALTHY. + """ + + @ray.remote(num_cpus=0) + class NewMockHTTPProxyActor: + def __init__(self): + self._should_succeed = True + self.num_health_checks = 0 + + def get_num_health_checks(self) -> int: + return self.num_health_checks + + def should_succeed(self, value: bool = True): + self._should_succeed = value + + async def ready(self): + return json.dumps(["mock_worker_id", "mock_log_file_path"]) + + async def check_health(self): + self.num_health_checks += 1 + if self._should_succeed: + return "Success!" + else: + raise RuntimeError("self._should_succeed is disabled!") + + proxy_state = _create_http_proxy_state(proxy_actor_class=NewMockHTTPProxyActor) + + # Wait for the proxy to become ready. + wait_for_condition( + condition_predictor=_update_and_check_proxy_status, + state=proxy_state, + status=HTTPProxyStatus.HEALTHY, + ) + + def _update_until_num_health_checks_received( + state: HTTPProxyState, num_health_checks: int + ): + state.update() + assert ( + ray.get(state.actor_handle.get_num_health_checks.remote()) + ) == num_health_checks + return True + + def incur_health_checks( + pass_checks: bool, num_checks: int, expected_final_status: HTTPProxyStatus + ): + """Waits for num_checks health checks to occur. + + Args: + pass_checks: whether the health checks should pass. + num_checks: number of checks to wait for. + expected_final_status: the final status that should be asserted. + """ + ray.get(proxy_state.actor_handle.should_succeed.remote(pass_checks)) + + cur_num_health_checks = ray.get( + proxy_state.actor_handle.get_num_health_checks.remote() + ) + + wait_for_condition( + condition_predictor=_update_until_num_health_checks_received, + state=proxy_state, + num_health_checks=cur_num_health_checks + num_checks, + ) + assert ( + ray.get(proxy_state.actor_handle.get_num_health_checks.remote()) + == cur_num_health_checks + num_checks + ) + + if expected_final_status: + assert proxy_state.status == expected_final_status + + # Make sure that the proxy_state's status remains HEALTHY as long as + # PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD failures don't occur consecutively. + for _ in range(3): + incur_health_checks( + pass_checks=True, + num_checks=1, + expected_final_status=HTTPProxyStatus.HEALTHY, + ) + incur_health_checks( + pass_checks=False, + num_checks=PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD - 1, + expected_final_status=HTTPProxyStatus.HEALTHY, + ) + + # Have health check succeed one more time. + incur_health_checks( + pass_checks=True, num_checks=1, expected_final_status=HTTPProxyStatus.HEALTHY + ) + + # Check failing the health check PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD + 1 + # times makes the proxy UNHEALTHY again. We do the `+ 1` to ensure that + # at least PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD checks have run and been + # processed by the HTTPProxyState. Otherwise, incur_health_checks could + # return after the health check failed but before the HTTPProxyState had + # time to ray.get() it and update its state. + incur_health_checks( + pass_checks=False, + num_checks=PROXY_HEALTH_CHECK_UNHEALTHY_THRESHOLD + 1, + expected_final_status=HTTPProxyStatus.UNHEALTHY, + ) + + @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_TIMEOUT_S", 0.1) @patch("ray.serve._private.http_state.PROXY_HEALTH_CHECK_PERIOD_S", 0.1) def test_http_proxy_state_check_health_always_timeout_timeout_eq_period():