Skip to content

Commit 88813ce

Browse files
authored
[ux] cache cluster status of autostop or spot clusters for 2s (#4332)
* add status_updated_at to DB * don't refresh autostop/spot cluster if it's recently been refreshed * update locking mechanism for status check to early exit * address PR comments * add warning about cluster status lock timeout
1 parent c8ea12b commit 88813ce

File tree

5 files changed

+143
-88
lines changed

5 files changed

+143
-88
lines changed

sky/backends/backend_utils.py

+102-67
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@
100100
CLUSTER_STATUS_LOCK_PATH = os.path.expanduser('~/.sky/.{}.lock')
101101
CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS = 20
102102

103+
# Time that must elapse since the last status check before we should re-check if
104+
# the cluster has been terminated or autostopped.
105+
_CLUSTER_STATUS_CACHE_DURATION_SECONDS = 2
106+
103107
# Filelocks for updating cluster's file_mounts.
104108
CLUSTER_FILE_MOUNTS_LOCK_PATH = os.path.expanduser(
105109
'~/.sky/.{}_file_mounts.lock')
@@ -1669,11 +1673,27 @@ def check_can_clone_disk_and_override_task(
16691673

16701674
def _update_cluster_status_no_lock(
16711675
cluster_name: str) -> Optional[Dict[str, Any]]:
1672-
"""Updates the status of the cluster.
1676+
"""Update the cluster status.
1677+
1678+
The cluster status is updated by checking ray cluster and real status from
1679+
cloud.
1680+
1681+
The function will update the cached cluster status in the global state. For
1682+
the design of the cluster status and transition, please refer to the
1683+
sky/design_docs/cluster_status.md
1684+
1685+
Returns:
1686+
If the cluster is terminated or does not exist, return None. Otherwise
1687+
returns the input record with status and handle potentially updated.
16731688
16741689
Raises:
1690+
exceptions.ClusterOwnerIdentityMismatchError: if the current user is
1691+
not the same as the user who created the cluster.
1692+
exceptions.CloudUserIdentityError: if we fail to get the current user
1693+
identity.
16751694
exceptions.ClusterStatusFetchingError: the cluster status cannot be
1676-
fetched from the cloud provider.
1695+
fetched from the cloud provider or there are leaked nodes causing
1696+
the node number larger than expected.
16771697
"""
16781698
record = global_user_state.get_cluster_from_name(cluster_name)
16791699
if record is None:
@@ -1893,52 +1913,22 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
18931913
return global_user_state.get_cluster_from_name(cluster_name)
18941914

18951915

1896-
def _update_cluster_status(
1897-
cluster_name: str,
1898-
acquire_per_cluster_status_lock: bool,
1899-
cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
1900-
) -> Optional[Dict[str, Any]]:
1901-
"""Update the cluster status.
1916+
def _must_refresh_cluster_status(
1917+
record: Dict[str, Any],
1918+
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]]
1919+
) -> bool:
1920+
force_refresh_for_cluster = (force_refresh_statuses is not None and
1921+
record['status'] in force_refresh_statuses)
19021922

1903-
The cluster status is updated by checking ray cluster and real status from
1904-
cloud.
1923+
use_spot = record['handle'].launched_resources.use_spot
1924+
has_autostop = (record['status'] != status_lib.ClusterStatus.STOPPED and
1925+
record['autostop'] >= 0)
1926+
recently_refreshed = (record['status_updated_at'] is not None and
1927+
time.time() - record['status_updated_at'] <
1928+
_CLUSTER_STATUS_CACHE_DURATION_SECONDS)
1929+
is_stale = (use_spot or has_autostop) and not recently_refreshed
19051930

1906-
The function will update the cached cluster status in the global state. For
1907-
the design of the cluster status and transition, please refer to the
1908-
sky/design_docs/cluster_status.md
1909-
1910-
Args:
1911-
cluster_name: The name of the cluster.
1912-
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
1913-
before updating the status.
1914-
cluster_status_lock_timeout: The timeout to acquire the per-cluster
1915-
lock.
1916-
1917-
Returns:
1918-
If the cluster is terminated or does not exist, return None. Otherwise
1919-
returns the input record with status and handle potentially updated.
1920-
1921-
Raises:
1922-
exceptions.ClusterOwnerIdentityMismatchError: if the current user is
1923-
not the same as the user who created the cluster.
1924-
exceptions.CloudUserIdentityError: if we fail to get the current user
1925-
identity.
1926-
exceptions.ClusterStatusFetchingError: the cluster status cannot be
1927-
fetched from the cloud provider or there are leaked nodes causing
1928-
the node number larger than expected.
1929-
"""
1930-
if not acquire_per_cluster_status_lock:
1931-
return _update_cluster_status_no_lock(cluster_name)
1932-
1933-
try:
1934-
with filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name),
1935-
timeout=cluster_status_lock_timeout):
1936-
return _update_cluster_status_no_lock(cluster_name)
1937-
except filelock.Timeout:
1938-
logger.debug('Refreshing status: Failed get the lock for cluster '
1939-
f'{cluster_name!r}. Using the cached status.')
1940-
record = global_user_state.get_cluster_from_name(cluster_name)
1941-
return record
1931+
return force_refresh_for_cluster or is_stale
19421932

19431933

19441934
def refresh_cluster_record(
@@ -1956,16 +1946,22 @@ def refresh_cluster_record(
19561946
19571947
Args:
19581948
cluster_name: The name of the cluster.
1959-
force_refresh_statuses: if specified, refresh the cluster if it has one of
1960-
the specified statuses. Additionally, clusters satisfying the
1961-
following conditions will always be refreshed no matter the
1962-
argument is specified or not:
1963-
1. is a spot cluster, or
1964-
2. is a non-spot cluster, is not STOPPED, and autostop is set.
1949+
force_refresh_statuses: if specified, refresh the cluster if it has one
1950+
of the specified statuses. Additionally, clusters satisfying the
1951+
following conditions will be refreshed no matter the argument is
1952+
specified or not:
1953+
- the most latest available status update is more than
1954+
_CLUSTER_STATUS_CACHE_DURATION_SECONDS old, and one of:
1955+
1. the cluster is a spot cluster, or
1956+
2. cluster autostop is set and the cluster is not STOPPED.
19651957
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
1966-
before updating the status.
1958+
before updating the status. Even if this is True, the lock may not be
1959+
acquired if the status does not need to be refreshed.
19671960
cluster_status_lock_timeout: The timeout to acquire the per-cluster
1968-
lock. If timeout, the function will use the cached status.
1961+
lock. If timeout, the function will use the cached status. If the
1962+
value is <0, do not timeout (wait for the lock indefinitely). By
1963+
default, this is set to CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS. Warning:
1964+
if correctness is required, you must set this to -1.
19691965
19701966
Returns:
19711967
If the cluster is terminated or does not exist, return None.
@@ -1986,19 +1982,58 @@ def refresh_cluster_record(
19861982
return None
19871983
check_owner_identity(cluster_name)
19881984

1989-
handle = record['handle']
1990-
if isinstance(handle, backends.CloudVmRayResourceHandle):
1991-
use_spot = handle.launched_resources.use_spot
1992-
has_autostop = (record['status'] != status_lib.ClusterStatus.STOPPED and
1993-
record['autostop'] >= 0)
1994-
force_refresh_for_cluster = (force_refresh_statuses is not None and
1995-
record['status'] in force_refresh_statuses)
1996-
if force_refresh_for_cluster or has_autostop or use_spot:
1997-
record = _update_cluster_status(
1998-
cluster_name,
1999-
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
2000-
cluster_status_lock_timeout=cluster_status_lock_timeout)
2001-
return record
1985+
if not isinstance(record['handle'], backends.CloudVmRayResourceHandle):
1986+
return record
1987+
1988+
# The loop logic allows us to notice if the status was updated in the
1989+
# global_user_state by another process and stop trying to get the lock.
1990+
# The core loop logic is adapted from FileLock's implementation.
1991+
lock = filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name))
1992+
start_time = time.perf_counter()
1993+
1994+
# Loop until we have an up-to-date status or until we acquire the lock.
1995+
while True:
1996+
# Check to see if we can return the cached status.
1997+
if not _must_refresh_cluster_status(record, force_refresh_statuses):
1998+
return record
1999+
2000+
if not acquire_per_cluster_status_lock:
2001+
return _update_cluster_status_no_lock(cluster_name)
2002+
2003+
# Try to acquire the lock so we can fetch the status.
2004+
try:
2005+
with lock.acquire(blocking=False):
2006+
# Lock acquired.
2007+
2008+
# Check the cluster status again, since it could have been
2009+
# updated between our last check and acquiring the lock.
2010+
record = global_user_state.get_cluster_from_name(cluster_name)
2011+
if record is None or not _must_refresh_cluster_status(
2012+
record, force_refresh_statuses):
2013+
return record
2014+
2015+
# Update and return the cluster status.
2016+
return _update_cluster_status_no_lock(cluster_name)
2017+
except filelock.Timeout:
2018+
# lock.acquire() will throw a Timeout exception if the lock is not
2019+
# available and we have blocking=False.
2020+
pass
2021+
2022+
# Logic adapted from FileLock.acquire().
2023+
# If cluster_status_lock_time is <0, we will never hit this. No timeout.
2024+
# Otherwise, if we have timed out, return the cached status. This has
2025+
# the potential to cause correctness issues, but if so it is the
2026+
# caller's responsibility to set the timeout to -1.
2027+
if 0 <= cluster_status_lock_timeout < time.perf_counter() - start_time:
2028+
logger.debug('Refreshing status: Failed get the lock for cluster '
2029+
f'{cluster_name!r}. Using the cached status.')
2030+
return record
2031+
time.sleep(0.05)
2032+
2033+
# Refresh for next loop iteration.
2034+
record = global_user_state.get_cluster_from_name(cluster_name)
2035+
if record is None:
2036+
return None
20022037

20032038

20042039
@timeline.event

sky/backends/cloud_vm_ray_backend.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3558,7 +3558,7 @@ def _teardown(self,
35583558
backend_utils.CLUSTER_STATUS_LOCK_PATH.format(cluster_name))
35593559

35603560
try:
3561-
with filelock.FileLock(
3561+
with timeline.FileLockEvent(
35623562
lock_path,
35633563
backend_utils.CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS):
35643564
self.teardown_no_lock(

sky/clouds/service_catalog/aws_catalog.py

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from sky.utils import common_utils
2121
from sky.utils import resources_utils
2222
from sky.utils import rich_utils
23+
from sky.utils import timeline
2324
from sky.utils import ux_utils
2425

2526
if typing.TYPE_CHECKING:
@@ -100,6 +101,7 @@ def _get_az_mappings(aws_user_hash: str) -> Optional['pd.DataFrame']:
100101
return az_mappings
101102

102103

104+
@timeline.event
103105
def _fetch_and_apply_az_mapping(df: common.LazyDataFrame) -> 'pd.DataFrame':
104106
"""Maps zone IDs (use1-az1) to zone names (us-east-1x).
105107

sky/global_user_state.py

+36-16
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ def create_table(cursor, conn):
6060
owner TEXT DEFAULT null,
6161
cluster_hash TEXT DEFAULT null,
6262
storage_mounts_metadata BLOB DEFAULT null,
63-
cluster_ever_up INTEGER DEFAULT 0)""")
63+
cluster_ever_up INTEGER DEFAULT 0,
64+
status_updated_at INTEGER DEFAULT null)""")
6465

6566
# Table for Cluster History
6667
# usage_intervals: List[Tuple[int, int]]
@@ -130,6 +131,10 @@ def create_table(cursor, conn):
130131
# clusters were never really UP, setting it to 1 means they won't be
131132
# auto-deleted during any failover.
132133
value_to_replace_existing_entries=1)
134+
135+
db_utils.add_column_to_table(cursor, conn, 'clusters', 'status_updated_at',
136+
'INTEGER DEFAULT null')
137+
133138
conn.commit()
134139

135140

@@ -159,6 +164,7 @@ def add_or_update_cluster(cluster_name: str,
159164
status = status_lib.ClusterStatus.INIT
160165
if ready:
161166
status = status_lib.ClusterStatus.UP
167+
status_updated_at = int(time.time())
162168

163169
# TODO (sumanth): Cluster history table will have multiple entries
164170
# when the cluster failover through multiple regions (one entry per region).
@@ -191,7 +197,7 @@ def add_or_update_cluster(cluster_name: str,
191197
# specified.
192198
'(name, launched_at, handle, last_use, status, '
193199
'autostop, to_down, metadata, owner, cluster_hash, '
194-
'storage_mounts_metadata, cluster_ever_up) '
200+
'storage_mounts_metadata, cluster_ever_up, status_updated_at) '
195201
'VALUES ('
196202
# name
197203
'?, '
@@ -228,7 +234,9 @@ def add_or_update_cluster(cluster_name: str,
228234
'COALESCE('
229235
'(SELECT storage_mounts_metadata FROM clusters WHERE name=?), null), '
230236
# cluster_ever_up
231-
'((SELECT cluster_ever_up FROM clusters WHERE name=?) OR ?)'
237+
'((SELECT cluster_ever_up FROM clusters WHERE name=?) OR ?),'
238+
# status_updated_at
239+
'?'
232240
')',
233241
(
234242
# name
@@ -260,6 +268,8 @@ def add_or_update_cluster(cluster_name: str,
260268
# cluster_ever_up
261269
cluster_name,
262270
int(ready),
271+
# status_updated_at
272+
status_updated_at,
263273
))
264274

265275
launched_nodes = getattr(cluster_handle, 'launched_nodes', None)
@@ -330,11 +340,13 @@ def remove_cluster(cluster_name: str, terminate: bool) -> None:
330340
# stopped VM, which leads to timeout.
331341
if hasattr(handle, 'stable_internal_external_ips'):
332342
handle.stable_internal_external_ips = None
343+
current_time = int(time.time())
333344
_DB.cursor.execute(
334-
'UPDATE clusters SET handle=(?), status=(?) '
335-
'WHERE name=(?)', (
345+
'UPDATE clusters SET handle=(?), status=(?), '
346+
'status_updated_at=(?) WHERE name=(?)', (
336347
pickle.dumps(handle),
337348
status_lib.ClusterStatus.STOPPED.value,
349+
current_time,
338350
cluster_name,
339351
))
340352
_DB.conn.commit()
@@ -359,10 +371,10 @@ def get_glob_cluster_names(cluster_name: str) -> List[str]:
359371

360372
def set_cluster_status(cluster_name: str,
361373
status: status_lib.ClusterStatus) -> None:
362-
_DB.cursor.execute('UPDATE clusters SET status=(?) WHERE name=(?)', (
363-
status.value,
364-
cluster_name,
365-
))
374+
current_time = int(time.time())
375+
_DB.cursor.execute(
376+
'UPDATE clusters SET status=(?), status_updated_at=(?) WHERE name=(?)',
377+
(status.value, current_time, cluster_name))
366378
count = _DB.cursor.rowcount
367379
_DB.conn.commit()
368380
assert count <= 1, count
@@ -570,15 +582,18 @@ def _load_storage_mounts_metadata(
570582

571583
def get_cluster_from_name(
572584
cluster_name: Optional[str]) -> Optional[Dict[str, Any]]:
573-
rows = _DB.cursor.execute('SELECT * FROM clusters WHERE name=(?)',
574-
(cluster_name,)).fetchall()
585+
rows = _DB.cursor.execute(
586+
'SELECT name, launched_at, handle, last_use, status, autostop, '
587+
'metadata, to_down, owner, cluster_hash, storage_mounts_metadata, '
588+
'cluster_ever_up, status_updated_at FROM clusters WHERE name=(?)',
589+
(cluster_name,)).fetchall()
575590
for row in rows:
576591
# Explicitly specify the number of fields to unpack, so that
577592
# we can add new fields to the database in the future without
578593
# breaking the previous code.
579594
(name, launched_at, handle, last_use, status, autostop, metadata,
580-
to_down, owner, cluster_hash, storage_mounts_metadata,
581-
cluster_ever_up) = row[:12]
595+
to_down, owner, cluster_hash, storage_mounts_metadata, cluster_ever_up,
596+
status_updated_at) = row[:13]
582597
# TODO: use namedtuple instead of dict
583598
record = {
584599
'name': name,
@@ -594,19 +609,23 @@ def get_cluster_from_name(
594609
'storage_mounts_metadata':
595610
_load_storage_mounts_metadata(storage_mounts_metadata),
596611
'cluster_ever_up': bool(cluster_ever_up),
612+
'status_updated_at': status_updated_at,
597613
}
598614
return record
599615
return None
600616

601617

602618
def get_clusters() -> List[Dict[str, Any]]:
603619
rows = _DB.cursor.execute(
604-
'select * from clusters order by launched_at desc').fetchall()
620+
'select name, launched_at, handle, last_use, status, autostop, '
621+
'metadata, to_down, owner, cluster_hash, storage_mounts_metadata, '
622+
'cluster_ever_up, status_updated_at from clusters '
623+
'order by launched_at desc').fetchall()
605624
records = []
606625
for row in rows:
607626
(name, launched_at, handle, last_use, status, autostop, metadata,
608-
to_down, owner, cluster_hash, storage_mounts_metadata,
609-
cluster_ever_up) = row[:12]
627+
to_down, owner, cluster_hash, storage_mounts_metadata, cluster_ever_up,
628+
status_updated_at) = row[:13]
610629
# TODO: use namedtuple instead of dict
611630
record = {
612631
'name': name,
@@ -622,6 +641,7 @@ def get_clusters() -> List[Dict[str, Any]]:
622641
'storage_mounts_metadata':
623642
_load_storage_mounts_metadata(storage_mounts_metadata),
624643
'cluster_ever_up': bool(cluster_ever_up),
644+
'status_updated_at': status_updated_at,
625645
}
626646

627647
records.append(record)

sky/utils/timeline.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,9 @@ def event(name_or_fn: Union[str, Callable], message: Optional[str] = None):
7979
class FileLockEvent:
8080
"""Serve both as a file lock and event for the lock."""
8181

82-
def __init__(self, lockfile: Union[str, os.PathLike]):
82+
def __init__(self, lockfile: Union[str, os.PathLike], timeout: float = -1):
8383
self._lockfile = lockfile
84-
# TODO(mraheja): remove pylint disabling when filelock version updated
85-
# pylint: disable=abstract-class-instantiated
86-
self._lock = filelock.FileLock(self._lockfile)
84+
self._lock = filelock.FileLock(self._lockfile, timeout)
8785
self._hold_lock_event = Event(f'[FileLock.hold]:{self._lockfile}')
8886

8987
def acquire(self):

0 commit comments

Comments
 (0)