diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 455baeaf5d9..12335bdbf83 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -46,6 +46,7 @@ class CloudImplementationFeatures(enum.Enum): STORAGE_MOUNTING = 'storage_mounting' HOST_CONTROLLERS = 'host_controllers' # Can run jobs/serve controllers AUTO_TERMINATE = 'auto_terminate' # Pod/VM can stop or down itself + ENVOY = 'envoy_load_balancer' class Region(collections.namedtuple('Region', ['name'])): diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index 471639626eb..81d2425a9b1 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -69,6 +69,9 @@ class Kubernetes(clouds.Cloud): 'tiers are not ' 'supported in ' 'Kubernetes.', + clouds.CloudImplementationFeatures.ENVOY: 'Envoy load balancer is not ' + 'supported on Kubernetes ' + 'controllers.' } IMAGE_CPU = 'skypilot:custom-cpu-ubuntu-2004' diff --git a/sky/serve/constants.py b/sky/serve/constants.py index 3974293190e..3d6deee4295 100644 --- a/sky/serve/constants.py +++ b/sky/serve/constants.py @@ -100,3 +100,11 @@ TERMINATE_REPLICA_VERSION_MISMATCH_ERROR = ( 'The version of service is outdated and does not support manually ' 'terminating replicas. Please terminate the service and spin up again.') + +# TODO(ejj) ultimately these should be configurable by users. +ENVOY_THREADS = '1' +ENVOY_VERSION = '1.32.0' + +LB_TYPE_PYTHON = 'python' +LB_TYPE_ENVOY = 'envoy' +LB_TYPES = [LB_TYPE_PYTHON, LB_TYPE_ENVOY] diff --git a/sky/serve/core.py b/sky/serve/core.py index f6f6c53ad7b..f41befbef2e 100644 --- a/sky/serve/core.py +++ b/sky/serve/core.py @@ -7,6 +7,7 @@ import sky from sky import backends +from sky import clouds from sky import exceptions from sky import sky_logging from sky import task as task_lib @@ -156,6 +157,18 @@ def up( controller=controller_utils.Controllers.SKY_SERVE_CONTROLLER, task_resources=task.resources) + # Check that the Envoy load balancer isn't being used on an unsupported + # cloud. + lb_type = task_config.get('service', {}).get('load_balancer_type', None) + if lb_type == serve_constants.LB_TYPE_ENVOY: + for resource in controller_resources: + if resource.cloud is None: + continue + + requested_features = {clouds.CloudImplementationFeatures.ENVOY} + resource.cloud.check_features_are_supported( + resource, requested_features) + vars_to_fill = { 'remote_task_yaml_path': remote_tmp_task_yaml_path, 'local_task_yaml_path': service_file.name, diff --git a/sky/serve/load_balancer.py b/sky/serve/load_balancer.py index 30697532a22..9c1fd16d0ba 100644 --- a/sky/serve/load_balancer.py +++ b/sky/serve/load_balancer.py @@ -1,14 +1,18 @@ """LoadBalancer: Distribute any incoming request to all ready replicas.""" import asyncio import logging +import os +import tempfile import threading -from typing import Dict, Optional, Union +from typing import Dict, List, Optional, Union +from urllib.parse import urlparse import aiohttp import fastapi import httpx from starlette import background import uvicorn +import yaml from sky import sky_logging from sky.serve import constants @@ -20,7 +24,57 @@ class SkyServeLoadBalancer: - """SkyServeLoadBalancer: distribute incoming traffic with proxy. + """SkyServeLoadBalancer: load balancer for distributing requests to Sky + Serve replicas. + + The SkyServeLoadBalancer class serves as the base class for the the various + child implementations. + """ + + def __init__(self, service_name: str, controller_url: str, + load_balancer_port: int) -> None: + """Initialize the load balancer. + + Args: + service_name: The name of the service this load balancer serves. + controller_url: The URL of the controller. + load_balancer_port: The port where the load balancer listens to. + """ + + self._controller_url = controller_url + self._load_balancer_port = load_balancer_port + self._service_name = service_name + + async def _controller_sync(self, + request_aggregator: dict) -> Optional[List[str]]: + """ Sync with the controller once. + + Contact the controller. Give it the information contained in + `request_aggregator`. Receive the current set of Available replicas. + """ + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + self._controller_url + '/controller/load_balancer_sync', + json={'request_aggregator': request_aggregator}, + timeout=aiohttp.ClientTimeout(5)) as response: + + response.raise_for_status() + response_json = await response.json() + ready_replica_urls = response_json.get( + 'ready_replica_urls', []) + except aiohttp.ClientError as e: + logger.error('An error occurred when syncing with ' + f'the controller: {e}') + return None + else: + logger.debug(f'Available Replica URLs: {ready_replica_urls}') + return ready_replica_urls + + +class PythonLoadBalancer(SkyServeLoadBalancer): + """PythonLoadBalancer: distribute incoming traffic with a python proxy. This class accept any traffic to the controller and proxies it to the appropriate endpoint replica according to the load balancing @@ -28,17 +82,20 @@ class SkyServeLoadBalancer: """ def __init__(self, + service_name: str, controller_url: str, load_balancer_port: int, load_balancing_policy_name: Optional[str] = None) -> None: """Initialize the load balancer. Args: + service_name: The name of the service this load balancer serves. controller_url: The URL of the controller. load_balancer_port: The port where the load balancer listens to. load_balancing_policy_name: The name of the load balancing policy to use. Defaults to None. """ + super().__init__(service_name, controller_url, load_balancer_port) self._app = fastapi.FastAPI() self._controller_url: str = controller_url self._load_balancer_port: int = load_balancer_port @@ -75,42 +132,28 @@ async def _sync_with_controller(self): while True: close_client_tasks = [] - async with aiohttp.ClientSession() as session: - try: - # Send request information - async with session.post( - self._controller_url + - '/controller/load_balancer_sync', - json={ - 'request_aggregator': - self._request_aggregator.to_dict() - }, - timeout=aiohttp.ClientTimeout(5), - ) as response: - # Clean up after reporting request info to avoid OOM. - self._request_aggregator.clear() - response.raise_for_status() - response_json = await response.json() - ready_replica_urls = response_json.get( - 'ready_replica_urls', []) - except aiohttp.ClientError as e: - logger.error('An error occurred when syncing with ' - f'the controller: {e}') - else: - logger.info(f'Available Replica URLs: {ready_replica_urls}') - with self._client_pool_lock: - self._load_balancing_policy.set_ready_replicas( - ready_replica_urls) - for replica_url in ready_replica_urls: - if replica_url not in self._client_pool: - self._client_pool[replica_url] = ( - httpx.AsyncClient(base_url=replica_url)) - urls_to_close = set( - self._client_pool.keys()) - set(ready_replica_urls) - client_to_close = [] - for replica_url in urls_to_close: - client_to_close.append( - self._client_pool.pop(replica_url)) + + request_aggregator = self._request_aggregator.to_dict() + # Clean up before _controller_sync() early avoid OOM. + self._request_aggregator.clear() + + ready_replica_urls = await self._controller_sync(request_aggregator) + if ready_replica_urls is not None: + with self._client_pool_lock: + self._load_balancing_policy.set_ready_replicas( + ready_replica_urls) + for replica_url in ready_replica_urls: + if replica_url not in self._client_pool: + self._client_pool[replica_url] = (httpx.AsyncClient( + base_url=replica_url)) + urls_to_close = set( + self._client_pool.keys()) - set(ready_replica_urls) + client_to_close = [] + + for replica_url in urls_to_close: + client_to_close.append( + self._client_pool.pop(replica_url)) + for client in client_to_close: close_client_tasks.append(client.aclose()) @@ -229,8 +272,10 @@ async def startup(): uvicorn.run(self._app, host='0.0.0.0', port=self._load_balancer_port) -def run_load_balancer(controller_addr: str, +def run_load_balancer(service_name: str, + controller_addr: str, load_balancer_port: int, + load_balancer_type: Optional[str] = None, load_balancing_policy_name: Optional[str] = None) -> None: """ Run the load balancer. @@ -240,11 +285,286 @@ def run_load_balancer(controller_addr: str, policy_name: The name of the load balancing policy to use. Defaults to None. """ - load_balancer = SkyServeLoadBalancer( - controller_url=controller_addr, - load_balancer_port=load_balancer_port, - load_balancing_policy_name=load_balancing_policy_name) - load_balancer.run() + + if load_balancer_type == constants.LB_TYPE_PYTHON \ + or load_balancer_type is None: + plb = PythonLoadBalancer( + service_name=service_name, + controller_url=controller_addr, + load_balancer_port=load_balancer_port, + load_balancing_policy_name=load_balancing_policy_name) + plb.run() + elif load_balancer_type == constants.LB_TYPE_ENVOY: + elb = EnvoyLoadBalancer(service_name=service_name, + controller_url=controller_addr, + load_balancer_port=load_balancer_port) + asyncio.run(elb.run()) + else: + raise ValueError('Unknown load balancer type:' + + ' {load_balanacer_type}') + + +class EnvoyLoadBalancer(SkyServeLoadBalancer): + """ Envoy implementation of SkyServeLoadBalancer + + Envoy (https://www.envoyproxy.io/) is an Open Source HTTP proxy widely used + for both north-south and east-west load balancing in cloud-native + deployments. The Envoy Sky load balancer instantiates an Envoy load + balancer in a docker container, and configures to forward traffic + appropriately to replicas using Envoy configuration files. """ + + def __init__(self, service_name: str, controller_url: str, + load_balancer_port: int) -> None: + """ Initialize the Envoy load balancer + + Args: + service_name: The name of the service this load balancer serves. + controller_url: The URL of the controller. + load_balancer_port: Ingress port for the load balancer. + """ + + super().__init__(service_name, controller_url, load_balancer_port) + + # Name of the Envoy container. + self.envoy_name = EnvoyLoadBalancer._gen_envoy_name(service_name) + + # Folder which we will mount into the envoy docker container that will + # container the Envoy config file + self.envoy_config_folder = os.path.expanduser(f'~/{self.envoy_name}') + + @staticmethod + def _gen_envoy_name(service_name: str) -> str: + """Generate the name of an Envoy container from its service name.""" + return f'envoy-{service_name}' + + @staticmethod + async def stop_envoy(service_name: str): + """Stop the Envoy container corresponding to the provided service. + + Args: + service_name: Name of the service whose Envoy we should stop. + """ + name = EnvoyLoadBalancer._gen_envoy_name(service_name) + proc = await asyncio.create_subprocess_exec('docker', 'rm', '-f', name) + if await proc.wait() != 0: + # Note this is expected when using the python load balancer. We + # always clean up in the spirit of defensiveness. + logger.debug('Failed to remove envoy: %s', name) + + async def _start_envoy(self) -> bool: + """Start the Envoy container + + Returns: + True if successful, otherwise False. + """ + + cmd = [ + 'docker', 'run', '-d', '--name', self.envoy_name, '--restart', + 'unless-stopped', '-v', f'{self.envoy_config_folder}:/etc/envoy', + '-p', f'{self._load_balancer_port}:{self._load_balancer_port}', + f'envoyproxy/envoy:v{constants.ENVOY_VERSION}', '--concurrency', + constants.ENVOY_THREADS, '-c', '/etc/envoy/envoy.yaml' + ] + proc = await asyncio.create_subprocess_exec(*cmd) + logger.debug(f'Starting Envoy with command: {" ".join(cmd)}') + ret = await proc.wait() + return ret == 0 + + def write_yaml(self, envoy_config: dict, filename: str): + """ Writes an envoy configuration object to disk atomically. + + Args: + envoy_config: A python object representing envoy configuration. + This object will be coverted to yaml and written to disk. + + filename: The name of the file the yaml will be written to. Note + this is just the base filename not the full path. + """ + + # Envoy is constantly watching most xds files. To avoid confusing + # partial writes, it's better to udpate the configuration files + # atomically by writing to a temporary file and replacing the original. + envoy_yaml = yaml.dump(envoy_config, default_flow_style=False) + with tempfile.NamedTemporaryFile(mode='w', + delete=False, + dir=self.envoy_config_folder) as f: + f.write(envoy_yaml) + temp_path = f.name + + # Allow anyone to read the file so Envoy has access. + os.chmod(temp_path, 0o644) + os.rename(temp_path, f'{self.envoy_config_folder}/{filename}') + + def write_bootstrap_xds(self): + """ Writes the initial bootstrap Envoy configuration file. + + This method writes the initial Envoy configuration file that must be + available at boot, and does not change as replicas come and go. This + config file points at the dynamically changing confuration files like + eds.yaml for configuration that changes over time. + """ + + # Filters describe what to do with a connection received by a listener. + # This filter says the request should be handled by the cluster defined + # below. + filters = [{ + 'name': 'envoy.filters.network.http_connection_manager', + 'typed_config': { + '@type': + 'type.googleapis.com/envoy.extensions.filters.' + + 'network.http_connection_manager.v3.HttpConnectionManager', + 'stat_prefix': 'ingress_http', + 'http_filters': [{ + 'name': 'envoy.filters.http.router', + 'typed_config': { + '@type': 'type.googleapis.com/envoy.extensions.' + + 'filters.http.router.v3.Router', + + # We aren't using dynamic_stats, and Envoy recommends + # disabling them for profiling. + 'dynamic_stats': False + } + }], + + # We don't use random request ids, and Envoy recommends + # disabling for profiling. + 'generate_request_id': False, + 'route_config': { + 'virtual_hosts': [{ + 'name': 'local_service', + 'domains': ['*'], + 'routes': [{ + 'match': { + 'prefix': '/' + }, + 'route': { + 'cluster': 'cluster' + } + }] + }] + } + } + }] + + # Listeners are the entry point to envoy. This one handles all traffic + # received on the specified port using the filters described above. + # received on port 8080 and processes it with the above filters. + listener = { + 'name': 'listener', + 'address': { + 'socket_address': { + 'address': '0.0.0.0', + 'port_value': self._load_balancer_port, + } + }, + 'filter_chains': [{ + 'filters': filters + }] + } + + # A cluster is usually a group of endpoints that can be load balanced + # over. This one says to find the list of endpoints in eds.yaml. + cluster = { + 'name': 'cluster', + 'connect_timeout': '0.25s', + 'type': 'EDS', + 'lb_policy': 'ROUND_ROBIN', + 'eds_cluster_config': { + 'eds_config': { + 'path_config_source': { + 'path': '/etc/envoy/eds.yaml' + } + } + } + } + + config = { + 'node': { + 'id': 'controller', + 'cluster': self._service_name, + }, + 'static_resources': { + 'listeners': [listener], + 'clusters': [cluster], + } + } + + os.makedirs(self.envoy_config_folder, exist_ok=True) + self.write_yaml(config, 'envoy.yaml') + + def write_eds(self, replicas: List[str]): + """ Writes eds.yaml + + Endpoint Discovery Service (EDS) is a subset of Envoy xds that's used + for discovering backend load balancing endpoints for a particular + envoy cluster (load balancer). In our case, this contains the list of + replicas that our service can route to. + + This method writes eds.yaml to the correct location. It's intended to + be called every time the set of replicas changes. + """ + + lb_endpoints = [] + for url in replicas: + # TODO(ejj) it would be cleaner if the controller just sent us + # tuple containing IP and port rather than a url which we have to + # parse here. + parsed_url = urlparse(url) + lb_endpoints.append({ + 'endpoint': { + 'address': { + 'socket_address': { + 'address': parsed_url.hostname, + 'port_value': parsed_url.port, + } + } + } + }) + + config = { + 'resources': { + '@type': 'type.googleapis.com/envoy.config' + '.endpoint.v3.ClusterLoadAssignment', + 'cluster_name': 'cluster', + 'endpoints': { + 'lb_endpoints': lb_endpoints + } + } + } + self.write_yaml(config, 'eds.yaml') + + async def run(self): + self.write_bootstrap_xds() + + # Because docker can take some time to come up, we make multiple + # attempts to start before giving up. + envoy_started = False + logger.info('Starting envoy %s', self.envoy_name) + for _ in range(30): + await asyncio.sleep(5) + envoy_started = await self._start_envoy() + if envoy_started: + break + + if not envoy_started: + error = f'Failed to start envoy {self.envoy_name}' + logger.error(error) + raise RuntimeError(error) + + while True: + # TODO(ejj) add support for reporting QPS up to the controller. + # Presently auto-scaling doesn't work without it. + await asyncio.sleep(constants.LB_CONTROLLER_SYNC_INTERVAL_SECONDS) + ready_replica_urls = await self._controller_sync({}) + if ready_replica_urls is not None: + # If there are no replica, there could really be no replicas, + # or there could be something else wrong. Either way, it + # doesn't hurt to leave the config unchanged. + self.write_eds(ready_replica_urls) + + +def cleanup(service_name: str): + asyncio.run(EnvoyLoadBalancer.stop_envoy(service_name)) if __name__ == '__main__': @@ -267,5 +587,5 @@ def run_load_balancer(controller_addr: str, help=f'The load balancing policy to use. Available policies: ' f'{", ".join(available_policies)}.') args = parser.parse_args() - run_load_balancer(args.controller_addr, args.load_balancer_port, - args.load_balancing_policy) + run_load_balancer('cmd', args.controller_addr, args.load_balancer_port, + args.load_balanciong_policy) diff --git a/sky/serve/service.py b/sky/serve/service.py index 0a1c7f34766..48489f97d2b 100644 --- a/sky/serve/service.py +++ b/sky/serve/service.py @@ -85,6 +85,7 @@ def cleanup_storage(task_yaml: str) -> bool: def _cleanup(service_name: str) -> bool: """Clean up all service related resources, i.e. replicas and storage.""" + load_balancer.cleanup(service_name) failed = False replica_infos = serve_state.get_replica_infos(service_name) info2proc: Dict[replica_managers.ReplicaInfo, @@ -222,15 +223,22 @@ def _get_host(): # Extract the load balancing policy from the service spec policy_name = service_spec.load_balancing_policy + load_balancer_type = service_spec.load_balancer_type + # Start the load balancer. # TODO(tian): Probably we could enable multiple ports specified in # service spec and we could start multiple load balancers. # After that, we will have a mapping from replica port to endpoint. + # TODO(ejj): The Envoy load balancer probably doesn't need to run + # in its own process since all it's doing is generating a config + # file. That said, while we have the Python load balancer, it's + # simpler to be consistent between the two. load_balancer_process = multiprocessing.Process( target=ux_utils.RedirectOutputForProcess( load_balancer.run_load_balancer, load_balancer_log_file).run, - args=(controller_addr, load_balancer_port, policy_name)) + args=(service_name, controller_addr, load_balancer_port, + load_balancer_type, policy_name)) load_balancer_process.start() serve_state.set_service_load_balancer_port(service_name, load_balancer_port) diff --git a/sky/serve/service_spec.py b/sky/serve/service_spec.py index 000eed139f1..2b4d6743f44 100644 --- a/sky/serve/service_spec.py +++ b/sky/serve/service_spec.py @@ -31,6 +31,7 @@ def __init__( upscale_delay_seconds: Optional[int] = None, downscale_delay_seconds: Optional[int] = None, load_balancing_policy: Optional[str] = None, + load_balancer_type: Optional[str] = None, ) -> None: if max_replicas is not None and max_replicas < min_replicas: with ux_utils.print_exception_no_traceback(): @@ -64,6 +65,13 @@ def __init__( raise ValueError( f'Unknown load balancing policy: {load_balancing_policy}. ' f'Available policies: {list(serve.LB_POLICIES.keys())}') + + if (load_balancer_type is not None and + load_balancer_type not in constants.LB_TYPES): + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'Unknown load balancer type: {load_balancer_type}. ' + f'Available load balancers: {constants.LB_TYPES}') self._readiness_path: str = readiness_path self._initial_delay_seconds: int = initial_delay_seconds self._readiness_timeout_seconds: int = readiness_timeout_seconds @@ -79,6 +87,7 @@ def __init__( self._upscale_delay_seconds: Optional[int] = upscale_delay_seconds self._downscale_delay_seconds: Optional[int] = downscale_delay_seconds self._load_balancing_policy: Optional[str] = load_balancing_policy + self._load_balancer_type: Optional[str] = load_balancer_type self._use_ondemand_fallback: bool = ( self.dynamic_ondemand_fallback is not None and @@ -162,6 +171,8 @@ def from_yaml_config(config: Dict[str, Any]) -> 'SkyServiceSpec': service_config['load_balancing_policy'] = config.get( 'load_balancing_policy', None) + service_config['load_balancer_type'] = config.get( + 'load_balancer_type', None) return SkyServiceSpec(**service_config) @staticmethod @@ -219,6 +230,7 @@ def add_if_not_none(section, key, value, no_empty: bool = False): self.downscale_delay_seconds) add_if_not_none('load_balancing_policy', None, self._load_balancing_policy) + add_if_not_none('load_balancer_type', None, self._load_balancer_type) return config def probe_str(self): @@ -270,6 +282,7 @@ def __repr__(self) -> str: Readiness probe timeout seconds: {self.readiness_timeout_seconds} Replica autoscaling policy: {self.autoscaling_policy_str()} Spot Policy: {self.spot_policy_str()} + Load Balancer Type: {self.load_balancer_type} Load Balancing Policy: {self.load_balancing_policy} """) @@ -329,3 +342,7 @@ def use_ondemand_fallback(self) -> bool: @property def load_balancing_policy(self) -> Optional[str]: return self._load_balancing_policy + + @property + def load_balancer_type(self) -> Optional[str]: + return self._load_balancer_type diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 851e77a57fc..affe38d7e85 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -310,6 +310,7 @@ def get_service_schema(): """Schema for top-level `service:` field (for SkyServe).""" # To avoid circular imports, only import when needed. # pylint: disable=import-outside-toplevel + from sky.serve import constants as serve_constants from sky.serve import load_balancing_policies return { '$schema': 'https://json-schema.org/draft/2020-12/schema', @@ -390,6 +391,10 @@ def get_service_schema(): 'case_insensitive_enum': list( load_balancing_policies.LB_POLICIES.keys()) }, + 'load_balancer_type': { + 'type': 'string', + 'case_insensitive_enum': serve_constants.LB_TYPES + }, } } diff --git a/tests/skyserve/load_balancer/envoy_service.yaml b/tests/skyserve/load_balancer/envoy_service.yaml new file mode 100644 index 00000000000..cd68ce45d6b --- /dev/null +++ b/tests/skyserve/load_balancer/envoy_service.yaml @@ -0,0 +1,18 @@ +service: + readiness_probe: + path: /health + # For install dependencies + initial_delay_seconds: 180 + replica_policy: + min_replicas: 3 + load_balancer_type: envoy + +resources: + ports: 8080 + cpus: 2+ + +workdir: tests/skyserve/load_balancer + +setup: pip install fastapi[all] uvicorn + +run: python3 server.py --port 8080 diff --git a/tests/skyserve/load_balancer/service.yaml b/tests/skyserve/load_balancer/service.yaml index 742b8efd2f4..3395a1e35a1 100644 --- a/tests/skyserve/load_balancer/service.yaml +++ b/tests/skyserve/load_balancer/service.yaml @@ -5,6 +5,7 @@ service: initial_delay_seconds: 180 replica_policy: min_replicas: 3 + load_balancer_type: python resources: ports: 8080 diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 574dae21ea0..c1825de9665 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -4363,6 +4363,29 @@ def test_skyserve_load_balancer(generic_cloud: str): run_one_test(test) +@pytest.mark.serve +@pytest.mark.no_kubernetes # Replicas on k8s may be running on the same node and have the same public IP. Envoy isn't supported on kubernetes. +def test_skyserve_envoy_load_balancer(generic_cloud: str): + """Test skyserve load balancer round-robin policy""" + name = _get_service_name() + test = Test( + f'test-skyserve-envoy-load-balancer', + [ + f'sky serve up -n {name} --cloud {generic_cloud} -y tests/skyserve/load_balancer/envoy_service.yaml', + _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=3), + f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; ' + f'{_SERVE_STATUS_WAIT.format(name=name)}; ' + f'{_get_replica_ip(name, 1)}; ' + f'{_get_replica_ip(name, 2)}; {_get_replica_ip(name, 3)}; ' + 'python tests/skyserve/load_balancer/test_round_robin.py ' + '--endpoint $endpoint --replica-num 3 --replica-ips $ip1 $ip2 $ip3', + ], + _TEARDOWN_SERVICE.format(name=name), + timeout=20 * 60, + ) + run_one_test(test) + + @pytest.mark.gcp @pytest.mark.serve @pytest.mark.no_kubernetes