Skip to content

Commit

Permalink
add multi-node to skyserve
Browse files Browse the repository at this point in the history
  • Loading branch information
MaoZiming committed Mar 22, 2024
1 parent 8f4f6f8 commit 1f81583
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 35 deletions.
25 changes: 25 additions & 0 deletions examples/serve/multi_node/multi_node_job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# SkyServe YAML to launch a service where each replica is multi-node.
# Preemption of one node will cause the replica to be down and new replica to be launched.

service:
readiness_probe: /health
replica_policy:
min_replicas: 2
max_replicas: 2
target_qps_per_replica: 1

num_nodes: 2

resources:
cloud: gcp
ports: 8081
cpus: 2+
use_spot: true

workdir: examples/serve/http_server

run: |
if [ "${SKYPILOT_NODE_RANK}" == "0" ]; then
echo 'I am the master node'
fi
python3 server.py
89 changes: 54 additions & 35 deletions sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ def url(self) -> Optional[str]:
return None
return f'{handle.head_ip}:{self.replica_port}'

@property
def urls(self) -> Optional[List[str]]:
handle = self.handle()
if handle is None:
return None

return [f'{ip}:{self.replica_port}' for ip in handle.external_ips()]

@property
def status(self) -> serve_state.ReplicaStatus:
replica_status = self.status_property.to_replica_status()
Expand Down Expand Up @@ -472,45 +480,56 @@ def probe(
Returns:
Tuple of (self, is_ready, probe_time).
"""
replica_identity = f'replica {self.replica_id} with url {self.url}'
replica_identity = f'replica {self.replica_id} with urls {self.urls}'
# TODO(tian): This requiring the clock on each replica to be aligned,
# which may not be true when the GCP VMs have run for a long time. We
# should have a better way to do this. See #2539 for more information.

# The cluster handle is None
if self.urls is None:
return self, False, 0

probe_time = time.time()
try:
msg = ''
# TODO(tian): Support HTTPS in the future.
readiness_path = (f'http://{self.url}{readiness_path}')
if post_data is not None:
msg += 'POST'
response = requests.post(
readiness_path,
json=post_data,
timeout=serve_constants.READINESS_PROBE_TIMEOUT_SECONDS)
else:
msg += 'GET'
response = requests.get(
readiness_path,
timeout=serve_constants.READINESS_PROBE_TIMEOUT_SECONDS)
msg += (f' request to {replica_identity} returned status '
f'code {response.status_code}')
if response.status_code == 200:
msg += '.'
log_method = logger.info
else:
msg += f' and response {response.text}.'
msg = f'{colorama.Fore.YELLOW}{msg}{colorama.Style.RESET_ALL}'
log_method = logger.error
log_method(msg)
if response.status_code == 200:
logger.debug(f'{replica_identity.capitalize()} is ready.')
return self, True, probe_time
except requests.exceptions.RequestException as e:
logger.error(
f'{colorama.Fore.YELLOW}Error when probing {replica_identity}:'
f' {common_utils.format_exception(e)}.'
f'{colorama.Style.RESET_ALL}')
return self, False, probe_time
probe_success = True
for url in self.urls:
try:
msg = ''
# TODO(tian): Support HTTPS in the future.
readiness_full_path = (f'http://{url}{readiness_path}')
if post_data is not None:
msg += 'POST'
response = requests.post(
readiness_full_path,
json=post_data,
timeout=serve_constants.READINESS_PROBE_TIMEOUT_SECONDS)
else:
msg += 'GET'
response = requests.get(
readiness_full_path,
timeout=serve_constants.READINESS_PROBE_TIMEOUT_SECONDS)
msg += (f' request to {replica_identity} returned status '
f'code {response.status_code}')
if response.status_code == 200:
msg += '.'
log_method = logger.info
else:
msg += f' and response {response.text}.'
msg = (
f'{colorama.Fore.YELLOW}{msg}{colorama.Style.RESET_ALL}'
)
log_method = logger.error
log_method(msg)
if response.status_code == 200:
logger.debug(f'{replica_identity.capitalize()} is ready.')
else:
probe_success = False
except requests.exceptions.RequestException as e:
logger.error(f'{colorama.Fore.YELLOW}Error when probing'
f'{replica_identity}:'
f' {common_utils.format_exception(e)}.'
f'{colorama.Style.RESET_ALL}')
probe_success = False
return self, probe_success, probe_time

def __setstate__(self, state):
"""Set state from pickled state, for backward compatibility."""
Expand Down

0 comments on commit 1f81583

Please sign in to comment.