Skip to content

Commit

Permalink
[data] Trigger UDF.__del__ when an actor is about to exit (#49929)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

A stateful UDF may depend on external resources that may need to be
cleaned up at the end.
This PR allows performing the cleanup operations via `UDF.__del__`

Note, this only ensures cleanup is performed when the job exists
gracefully.
If the driver or the actor is forcefully killed, `__del__` will not be

---------

Signed-off-by: Hao Chen <[email protected]>
  • Loading branch information
raulchen authored Jan 24, 2025
1 parent d27f68c commit 9b9843f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,18 @@ def submit(
def __repr__(self):
return f"MapWorker({self.src_fn_name})"

def on_exit(self):
"""Called when the actor is about to exist.
This enables performing cleanup operations via `UDF.__del__`.
Note, this only ensures cleanup is performed when the job exists gracefully.
If the driver or the actor is forcefully killed, `__del__` will not be called.
"""
# `_map_actor_context` is a global variable that references the UDF object.
# Delete it to trigger `UDF.__del__`.
del ray.data._map_actor_context
ray.data._map_actor_context = None


@dataclass
class _ActorState:
Expand Down Expand Up @@ -746,6 +758,9 @@ def _remove_actor(self, actor: ray.actor.ActorHandle):
# garbage collect the actor, instead of using ray.kill.
# Because otherwise the actor cannot be restarted upon lineage reconstruction.
if actor in self._running_actors:
# Call `on_exit` to trigger `UDF.__del__` which may perform
# cleanup operations.
actor.on_exit.remote()
del self._running_actors[actor]

def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]:
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def __init__(self, node_id: str = "node1"):
def get_location(self) -> str:
return self.node_id

def on_exit(self):
pass


class TestActorPool(unittest.TestCase):
def setup_class(self):
Expand Down
25 changes: 25 additions & 0 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,31 @@ def test_random_sample_checks(ray_start_regular_shared):
ray.data.range(1).random_sample(10)


def test_actor_udf_cleanup(ray_start_regular_shared, tmp_path):
"""Test that for the actor map operator, the UDF object is deleted properly."""
test_file = tmp_path / "test.txt"

# Simulate the case that the UDF depends on some external resources that
# need to be cleaned up.
class StatefulUDF:
def __init__(self):
with open(test_file, "w") as f:
f.write("test")

def __call__(self, row):
return row

def __del__(self):
# Delete the file when the UDF is deleted.
os.remove(test_file)

ds = ray.data.range(10)
ds = ds.map(StatefulUDF, concurrency=1)
assert sorted(extract_values("id", ds.take_all())) == list(range(10))

wait_for_condition(lambda: not os.path.exists(test_file))


# NOTE: All tests above share a Ray cluster, while the tests below do not. These
# tests should only be carefully reordered to retain this invariant!
def test_actor_pool_strategy_default_num_actors(shutdown_only):
Expand Down

0 comments on commit 9b9843f

Please sign in to comment.