Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Multi-node support #3357

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions examples/serve/multi_node/multi_node.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# SkyServe YAML to launch a service where each replica is multi-node.
# Preemption of one node will cause the replica to be down and new replica to be launched.

service:
readiness_probe: /health
replica_policy:
min_replicas: 2
max_replicas: 2
target_qps_per_replica: 1

num_nodes: 2

resources:
cloud: gcp
ports: 8081
cpus: 2+
use_spot: true

workdir: examples/serve/http_server

run: |
if [ "${SKYPILOT_NODE_RANK}" == "0" ]; then
echo 'I am the master node'
fi
python3 server.py
89 changes: 54 additions & 35 deletions sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ def url(self) -> Optional[str]:
return None
return f'{handle.head_ip}:{self.replica_port}'

@property
def urls(self) -> Optional[List[str]]:
handle = self.handle()
if handle is None:
return None

return [f'{ip}:{self.replica_port}' for ip in handle.external_ips()]

@property
def status(self) -> serve_state.ReplicaStatus:
replica_status = self.status_property.to_replica_status()
Expand Down Expand Up @@ -472,45 +480,56 @@ def probe(
Returns:
Tuple of (self, is_ready, probe_time).
"""
replica_identity = f'replica {self.replica_id} with url {self.url}'
replica_identity = f'replica {self.replica_id} with urls {self.urls}'
# TODO(tian): This requiring the clock on each replica to be aligned,
# which may not be true when the GCP VMs have run for a long time. We
# should have a better way to do this. See #2539 for more information.

# The cluster handle is None
if self.urls is None:
return self, False, 0

probe_time = time.time()
try:
msg = ''
# TODO(tian): Support HTTPS in the future.
readiness_path = (f'http://{self.url}{readiness_path}')
if post_data is not None:
msg += 'POST'
response = requests.post(
readiness_path,
json=post_data,
timeout=serve_constants.READINESS_PROBE_TIMEOUT_SECONDS)
else:
msg += 'GET'
response = requests.get(
readiness_path,
timeout=serve_constants.READINESS_PROBE_TIMEOUT_SECONDS)
msg += (f' request to {replica_identity} returned status '
f'code {response.status_code}')
if response.status_code == 200:
msg += '.'
log_method = logger.info
else:
msg += f' and response {response.text}.'
msg = f'{colorama.Fore.YELLOW}{msg}{colorama.Style.RESET_ALL}'
log_method = logger.error
log_method(msg)
if response.status_code == 200:
logger.debug(f'{replica_identity.capitalize()} is ready.')
return self, True, probe_time
except requests.exceptions.RequestException as e:
logger.error(
f'{colorama.Fore.YELLOW}Error when probing {replica_identity}:'
f' {common_utils.format_exception(e)}.'
f'{colorama.Style.RESET_ALL}')
return self, False, probe_time
probe_success = True
for url in self.urls:
try:
msg = ''
# TODO(tian): Support HTTPS in the future.
readiness_full_path = (f'http://{url}{readiness_path}')
if post_data is not None:
msg += 'POST'
response = requests.post(
readiness_full_path,
json=post_data,
timeout=serve_constants.READINESS_PROBE_TIMEOUT_SECONDS)
else:
msg += 'GET'
response = requests.get(
readiness_full_path,
timeout=serve_constants.READINESS_PROBE_TIMEOUT_SECONDS)
msg += (f' request to {replica_identity} returned status '
f'code {response.status_code}')
if response.status_code == 200:
msg += '.'
log_method = logger.info
else:
msg += f' and response {response.text}.'
msg = (
f'{colorama.Fore.YELLOW}{msg}{colorama.Style.RESET_ALL}'
)
log_method = logger.error
log_method(msg)
if response.status_code == 200:
logger.debug(f'{replica_identity.capitalize()} is ready.')
else:
probe_success = False
except requests.exceptions.RequestException as e:
logger.error(f'{colorama.Fore.YELLOW}Error when probing'
f'{replica_identity}:'
f' {common_utils.format_exception(e)}.'
f'{colorama.Style.RESET_ALL}')
probe_success = False
return self, probe_success, probe_time

def __setstate__(self, state):
"""Set state from pickled state, for backward compatibility."""
Expand Down
Loading