diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 678ff6c0d5bbd..980cdbb2ad1a0 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -416,6 +416,18 @@ def submit( def __repr__(self): return f"MapWorker({self.src_fn_name})" + def on_exit(self): + """Called when the actor is about to exist. + This enables performing cleanup operations via `UDF.__del__`. + + Note, this only ensures cleanup is performed when the job exists gracefully. + If the driver or the actor is forcefully killed, `__del__` will not be called. + """ + # `_map_actor_context` is a global variable that references the UDF object. + # Delete it to trigger `UDF.__del__`. + del ray.data._map_actor_context + ray.data._map_actor_context = None + @dataclass class _ActorState: @@ -746,6 +758,9 @@ def _remove_actor(self, actor: ray.actor.ActorHandle): # garbage collect the actor, instead of using ray.kill. # Because otherwise the actor cannot be restarted upon lineage reconstruction. if actor in self._running_actors: + # Call `on_exit` to trigger `UDF.__del__` which may perform + # cleanup operations. + actor.on_exit.remote() del self._running_actors[actor] def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]: diff --git a/python/ray/data/tests/test_actor_pool_map_operator.py b/python/ray/data/tests/test_actor_pool_map_operator.py index f979d78904954..6c316b8cc4cf7 100644 --- a/python/ray/data/tests/test_actor_pool_map_operator.py +++ b/python/ray/data/tests/test_actor_pool_map_operator.py @@ -25,6 +25,9 @@ def __init__(self, node_id: str = "node1"): def get_location(self) -> str: return self.node_id + def on_exit(self): + pass + class TestActorPool(unittest.TestCase): def setup_class(self): diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index a8b6e65ad3327..4f07b239e584e 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -1500,6 +1500,31 @@ def test_random_sample_checks(ray_start_regular_shared): ray.data.range(1).random_sample(10) +def test_actor_udf_cleanup(ray_start_regular_shared, tmp_path): + """Test that for the actor map operator, the UDF object is deleted properly.""" + test_file = tmp_path / "test.txt" + + # Simulate the case that the UDF depends on some external resources that + # need to be cleaned up. + class StatefulUDF: + def __init__(self): + with open(test_file, "w") as f: + f.write("test") + + def __call__(self, row): + return row + + def __del__(self): + # Delete the file when the UDF is deleted. + os.remove(test_file) + + ds = ray.data.range(10) + ds = ds.map(StatefulUDF, concurrency=1) + assert sorted(extract_values("id", ds.take_all())) == list(range(10)) + + wait_for_condition(lambda: not os.path.exists(test_file)) + + # NOTE: All tests above share a Ray cluster, while the tests below do not. These # tests should only be carefully reordered to retain this invariant! def test_actor_pool_strategy_default_num_actors(shutdown_only):