Skip to content

Commit

Permalink
Update Ray core APIs (#228)
Browse files Browse the repository at this point in the history
Placement group APIs are updated to use scheduling_strategy

Co-authored-by: Antoni Baum <[email protected]>
  • Loading branch information
krfricke and Yard1 authored Aug 15, 2022
1 parent cad8c3e commit 69e431e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
5 changes: 5 additions & 0 deletions xgboost_ray/data_sources/modin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@

try:
import modin # noqa: F401
from modin.config.envvars import Engine
from distutils.version import LooseVersion
MODIN_INSTALLED = LooseVersion(modin.__version__) >= LooseVersion("0.9.0")

# Check if importing the Ray engine leads to errors
Engine().get()

except (ImportError, AttributeError):
MODIN_INSTALLED = False

Expand Down
21 changes: 13 additions & 8 deletions xgboost_ray/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class EarlyStopException(XGBoostError):
from ray.util.annotations import PublicAPI, DeveloperAPI
from ray.util.placement_group import PlacementGroup, \
remove_placement_group, get_current_placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util.queue import Queue

from xgboost_ray.util import Event, MultiActorTask, force_on_current_node
Expand Down Expand Up @@ -747,17 +748,21 @@ def _create_actor(
# Send DEFAULT_PG here, which changed in Ray >= 1.5.0
# If we send `None`, this will ignore the parent placement group and
# lead to errors e.g. when used within Ray Tune
return _RemoteRayXGBoostActor.options(
actor_cls = _RemoteRayXGBoostActor.options(
num_cpus=num_cpus_per_actor,
num_gpus=num_gpus_per_actor,
resources=resources_per_actor,
placement_group_capture_child_tasks=True,
placement_group=placement_group or DEFAULT_PG).remote(
rank=rank,
num_actors=num_actors,
queue=queue,
checkpoint_frequency=checkpoint_frequency,
distributed_callbacks=distributed_callbacks)
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=placement_group or DEFAULT_PG,
placement_group_capture_child_tasks=True,
))

return actor_cls.remote(
rank=rank,
num_actors=num_actors,
queue=queue,
checkpoint_frequency=checkpoint_frequency,
distributed_callbacks=distributed_callbacks)


def _trigger_data_load(actor, dtrain, evals):
Expand Down
20 changes: 17 additions & 3 deletions xgboost_ray/tests/test_tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
from xgboost_ray.tune import TuneReportCallback,\
TuneReportCheckpointCallback, _try_add_tune_callback

try:
from ray.air import Checkpoint
except Exception:

class Checkpoint:
pass


class XGBoostRayTuneTest(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -146,13 +153,17 @@ def testEndToEndCheckpointing(self):
log_to_file=True,
local_dir=self.experiment_dir)

self.assertTrue(os.path.exists(analysis.best_checkpoint))
if isinstance(analysis.best_checkpoint, Checkpoint):
self.assertTrue(analysis.best_checkpoint)
else:
self.assertTrue(os.path.exists(analysis.best_checkpoint))

def testEndToEndCheckpointingOrigTune(self):
ray_params = RayParams(cpus_per_actor=1, num_actors=2)
analysis = tune.run(
self.train_func(
ray_params, callbacks=[OrigTuneReportCheckpointCallback()]),
ray_params,
callbacks=[OrigTuneReportCheckpointCallback(frequency=1)]),
config=self.params,
resources_per_trial=ray_params.get_tune_resources(),
num_samples=1,
Expand All @@ -161,7 +172,10 @@ def testEndToEndCheckpointingOrigTune(self):
log_to_file=True,
local_dir=self.experiment_dir)

self.assertTrue(os.path.exists(analysis.best_checkpoint))
if isinstance(analysis.best_checkpoint, Checkpoint):
self.assertTrue(analysis.best_checkpoint)
else:
self.assertTrue(os.path.exists(analysis.best_checkpoint))


if __name__ == "__main__":
Expand Down

0 comments on commit 69e431e

Please sign in to comment.