Skip to content

Commit

Permalink
fix: Ensure that utilization idleness is checked after a set period (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fregataa authored Jul 2, 2024
1 parent 548b832 commit df57a09
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 8 deletions.
1 change: 1 addition & 0 deletions changes/2205.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that utilization idleness is checked after a set period.
48 changes: 40 additions & 8 deletions src/ai/backend/manager/idle.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,12 @@ def get_time_window(self, policy: Row) -> timedelta:
return timedelta(seconds=idle_timeout)
return self.time_window

def get_last_collected_key(self, session_id: SessionId) -> str:
def _get_last_collected_key(self, session_id: SessionId) -> str:
return f"session.{session_id}.util_last_collected"

def _get_first_collected_key(self, session_id: SessionId) -> str:
return f"session.{session_id}.util_first_collected"

async def check_idleness(
self,
kernel: Row,
Expand All @@ -848,7 +851,8 @@ async def check_idleness(
unavailable_resources: Set[str] = set()

util_series_key = f"session.{session_id}.util_series"
util_last_collected_key = self.get_last_collected_key(session_id)
util_first_collected_key = self._get_first_collected_key(session_id)
util_last_collected_key = self._get_last_collected_key(session_id)

# window_size: the length of utilization reports.
window_size = int(time_window.total_seconds() / interval)
Expand All @@ -858,16 +862,39 @@ async def check_idleness(
# Wait until the time "interval" is passed after the last udpated time.
t = await redis_helper.execute(self._redis_live, lambda r: r.time())
util_now: float = t[0] + (t[1] / (10**6))
raw_util_last_collected = await redis_helper.execute(
self._redis_live,
lambda r: r.get(util_last_collected_key),
raw_util_last_collected = cast(
bytes | None,
await redis_helper.execute(
self._redis_live,
lambda r: r.get(util_last_collected_key),
),
)
util_last_collected: float = (
float(raw_util_last_collected) if raw_util_last_collected else 0.0
)
if util_now - util_last_collected < interval:
return True

raw_util_first_collected = cast(
bytes | None,
await redis_helper.execute(
self._redis_live,
lambda r: r.get(util_first_collected_key),
),
)
if raw_util_first_collected is None:
util_first_collected = util_now
await redis_helper.execute(
self._redis_live,
lambda r: r.set(
util_first_collected_key,
f"{util_now:.06f}",
ex=max(86400, int(self.time_window.total_seconds() * 2)),
),
)
else:
util_first_collected = float(raw_util_first_collected)

# Report time remaining until the first time window is full as expire time
db_now: datetime = await get_db_now(dbconn)
kernel_created_at: datetime = kernel["created_at"]
Expand Down Expand Up @@ -923,14 +950,19 @@ async def check_idleness(
except TypeError:
util_series = {k: [] for k in self.resource_thresholds.keys()}

not_enough_data = False
do_idle_check: bool = True

for k in util_series:
util_series[k].append(current_utilizations[k])
if len(util_series[k]) > window_size:
util_series[k].pop(0)
else:
not_enough_data = True
do_idle_check = False

# Do not skip idleness-check if the current time passed the time window
if util_now - util_first_collected >= time_window.total_seconds():
do_idle_check = True

await redis_helper.execute(
self._redis_live,
lambda r: r.set(
Expand Down Expand Up @@ -972,7 +1004,7 @@ def _avg(util_list: list[float]) -> float:
),
)

if not_enough_data:
if not do_idle_check:
return True

# Check over-utilized (not to be collected) resources.
Expand Down

0 comments on commit df57a09

Please sign in to comment.