Skip to content

Commit

Permalink
[core] Return failure for failed runtime env dereference (#50003)
Browse files Browse the repository at this point in the history
Dereference is not supposed to fail, if only warning message is logged,
it's hard to detected failed precondition.

---------

Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny authored Jan 24, 2025
1 parent 2ab081b commit d27f68c
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions python/ray/_private/runtime_env/agent/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def _increase_reference_for_runtime_env(self, serialized_env: str):
self._runtime_env_reference[serialized_env] += 1

def _decrease_reference_for_runtime_env(self, serialized_env: str):
"""Decrease reference count for the given [serialized_env]. Throw exception if we cannot decrement reference."""
default_logger.debug(f"Decrease reference for runtime env {serialized_env}.")
unused = False
if self._runtime_env_reference[serialized_env] > 0:
Expand All @@ -126,10 +127,12 @@ def _decrease_reference_for_runtime_env(self, serialized_env: str):
del self._runtime_env_reference[serialized_env]
else:
default_logger.warning(f"Runtime env {serialized_env} does not exist.")
raise ValueError(
f"{serialized_env} cannot decrement reference since the reference count is 0"
)
if unused:
default_logger.info(f"Unused runtime env {serialized_env}.")
self._unused_runtime_env_callback(serialized_env)
return unused

def increase_reference(
self, runtime_env: RuntimeEnv, serialized_env: str, source_process: str
Expand All @@ -143,8 +146,9 @@ def increase_reference(
def decrease_reference(
self, runtime_env: RuntimeEnv, serialized_env: str, source_process: str
) -> None:
"""Decrease reference count for runtime env and uri. Throw exception if decrement reference count fails."""
if source_process in self._reference_exclude_sources:
return list()
return
self._decrease_reference_for_runtime_env(serialized_env)
uris = self._uris_parser(runtime_env)
self._decrease_reference_for_uris(uris)
Expand Down Expand Up @@ -543,9 +547,15 @@ async def DeleteRuntimeEnvIfPossible(self, request):
),
)

self._reference_table.decrease_reference(
runtime_env, request.serialized_runtime_env, request.source_process
)
try:
self._reference_table.decrease_reference(
runtime_env, request.serialized_runtime_env, request.source_process
)
except Exception as e:
return runtime_env_agent_pb2.DeleteRuntimeEnvIfPossibleReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=f"Fails to decrement reference for runtime env for {str(e)}",
)

return runtime_env_agent_pb2.DeleteRuntimeEnvIfPossibleReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK
Expand Down

0 comments on commit d27f68c

Please sign in to comment.