diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index c81de84252309..86cbd90130199 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -410,6 +410,18 @@ def submit( def __repr__(self): return f"MapWorker({self.src_fn_name})" + def on_exit(self): + """Called when the actor is about to exist. + This enables performing cleanup operations via `UDF.__del__`. + + Note, this only ensures cleanup is performed when the job exists gracefully. + If the driver or the actor is forcefully killed, `__del__` will not be called. + """ + # `_map_actor_context` is a global variable that references the UDF object. + # Delete it to trigger `UDF.__del__`. + del ray.data._map_actor_context + ray.data._map_actor_context = None + @dataclass class _ActorState: @@ -738,6 +750,9 @@ def _remove_actor(self, actor: ray.actor.ActorHandle): # garbage collect the actor, instead of using ray.kill. # Because otherwise the actor cannot be restarted upon lineage reconstruction. if actor in self._running_actors: + # Call `on_exit` to trigger `UDF.__del__` which may perform + # cleanup operations. + actor.on_exit.remote() del self._running_actors[actor] def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]: diff --git a/python/ray/data/tests/test_actor_pool_map_operator.py b/python/ray/data/tests/test_actor_pool_map_operator.py index 01327470089c0..2b078db1d8b80 100644 --- a/python/ray/data/tests/test_actor_pool_map_operator.py +++ b/python/ray/data/tests/test_actor_pool_map_operator.py @@ -24,6 +24,9 @@ def __init__(self, node_id: str = "node1"): def get_location(self) -> str: return self.node_id + def on_exit(self): + pass + class TestActorPool(unittest.TestCase): def setup_class(self): diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 8321864801f83..3ed0c551ea027 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -1500,6 +1500,31 @@ def test_random_sample_checks(ray_start_regular_shared): ray.data.range(1).random_sample(10) +def test_actor_udf_cleanup(ray_start_regular_shared, tmp_path): + """Test that for the actor map operator, the UDF object is deleted properly.""" + test_file = tmp_path / "test.txt" + + # Simulate the case that the UDF depends on some external resources that + # need to be cleaned up. + class StatefulUDF: + def __init__(self): + with open(test_file, "w") as f: + f.write("test") + + def __call__(self, row): + return row + + def __del__(self): + # Delete the file when the UDF is deleted. + os.remove(test_file) + + ds = ray.data.range(10) + ds = ds.map(StatefulUDF, concurrency=1) + assert sorted(extract_values("id", ds.take_all())) == list(range(10)) + + wait_for_condition(lambda: not os.path.exists(test_file)) + + # NOTE: All tests above share a Ray cluster, while the tests below do not. These # tests should only be carefully reordered to retain this invariant! def test_actor_pool_strategy_default_num_actors(shutdown_only):