Skip to content

Commit

Permalink
Reduce verbosity for ray events (#226)
Browse files Browse the repository at this point in the history
Adds a verbose flag to RayParams to toggle events. Defaults to False, but open to discuss.

The main thing is that when using AIR/Tune, this logging becomes quite verbose.

Signed-off-by: Richard Liaw <[email protected]>
Co-authored-by: Kai Fricke <[email protected]>
  • Loading branch information
richardliaw and Kai Fricke authored Aug 15, 2022
1 parent 08f3bc1 commit cad8c3e
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions xgboost_ray/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ class RayParams:
Defaults to 0 (no retries). Set to -1 for unlimited retries.
checkpoint_frequency (int): How often to save checkpoints. Defaults
to ``5`` (every 5th iteration).
verbose (bool): Whether to output Ray-specific info messages
during training/prediction.
"""
# Actor scheduling
num_actors: int = 0
Expand All @@ -386,6 +388,8 @@ class RayParams:
# Distributed callbacks
distributed_callbacks: Optional[List[DistributedCallback]] = None

verbose: Optional[bool] = None

def get_tune_resources(self):
"""Return the resources to use for xgboost_ray training with Tune."""
if self.cpus_per_actor <= 0 or self.num_actors <= 0:
Expand Down Expand Up @@ -426,6 +430,9 @@ def _validate_ray_params(ray_params: Union[None, RayParams, dict]) \
warnings.warn(
f"`num_actors` in `ray_params` is smaller than 2 "
f"({ray_params.num_actors}). XGBoost will NOT be distributed!")
if ray_params.verbose is None:
# In Tune sessions, reduce verbosity
ray_params.verbose = not is_session_enabled()
return ray_params


Expand Down Expand Up @@ -930,6 +937,9 @@ def _train(params: Dict,
from xgboost_ray.elastic import _maybe_schedule_new_actors, \
_update_scheduled_actor_states, _get_actor_alive_status

# Do not modify original parameters
params = params.copy()

# Un-schedule possible scheduled restarts
_training_state.restart_training_at = None

Expand All @@ -944,6 +954,13 @@ def _train(params: Dict,
params["nthread"] = cpus_per_actor
params["n_jobs"] = cpus_per_actor

if ray_params.verbose:
maybe_log = logger.info
params.setdefault("verbosity", 1)
else:
maybe_log = logger.debug
params.setdefault("verbosity", 0)

# This is a callback that handles actor failures.
# We identify the rank of the failed actor, add this to a set of
# failed actors (which we might want to restart later), and set its
Expand Down Expand Up @@ -979,9 +996,10 @@ def handle_actor_failure(actor_id):
newly_created += 1

alive_actors = sum(1 for a in _training_state.actors if a is not None)
logger.info(f"[RayXGBoost] Created {newly_created} new actors "
f"({alive_actors} total actors). Waiting until actors "
f"are ready for training.")

maybe_log(f"[RayXGBoost] Created {newly_created} new actors "
f"({alive_actors} total actors). Waiting until actors "
f"are ready for training.")

# For distributed datasets (e.g. Modin), this will initialize
# (and fix) the assignment of data shards to actor ranks
Expand Down Expand Up @@ -1024,7 +1042,7 @@ def handle_actor_failure(actor_id):
_get_actor_alive_status(_training_state.actors, handle_actor_failure)
raise RayActorError from exc

logger.info("[RayXGBoost] Starting XGBoost training.")
maybe_log("[RayXGBoost] Starting XGBoost training.")

# Start Rabit tracker for gradient sharing
rabit_process, env = _start_rabit_tracker(alive_actors)
Expand Down Expand Up @@ -1515,10 +1533,15 @@ def _wrapped(*args, **kwargs):
train_additional_results["training_time_s"] = total_training_time
train_additional_results["total_time_s"] = total_time

logger.info("[RayXGBoost] Finished XGBoost training on training data "
"with total N={total_n:,} in {total_time_s:.2f} seconds "
"({training_time_s:.2f} pure XGBoost training time).".format(
**train_additional_results))
if ray_params.verbose:
maybe_log = logger.info
else:
maybe_log = logger.debug

maybe_log("[RayXGBoost] Finished XGBoost training on training data "
"with total N={total_n:,} in {total_time_s:.2f} seconds "
"({training_time_s:.2f} pure XGBoost training time).".format(
**train_additional_results))

_shutdown(
actors=actors,
Expand All @@ -1540,6 +1563,11 @@ def _predict(model: xgb.Booster, data: RayDMatrix, ray_params: RayParams,
**kwargs):
_assert_ray_support()

if ray_params.verbose:
maybe_log = logger.info
else:
maybe_log = logger.debug

if not ray.is_initialized():
ray.init()

Expand All @@ -1555,7 +1583,7 @@ def _predict(model: xgb.Booster, data: RayDMatrix, ray_params: RayParams,
distributed_callbacks=ray_params.distributed_callbacks)
for i in range(ray_params.num_actors)
]
logger.info(f"[RayXGBoost] Created {len(actors)} remote actors.")
maybe_log(f"[RayXGBoost] Created {len(actors)} remote actors.")

# Split data across workers
wait_load = []
Expand All @@ -1572,7 +1600,7 @@ def _predict(model: xgb.Booster, data: RayDMatrix, ray_params: RayParams,
# Put model into object store
model_ref = ray.put(model)

logger.info("[RayXGBoost] Starting XGBoost prediction.")
maybe_log("[RayXGBoost] Starting XGBoost prediction.")

# Train
fut = [actor.predict.remote(model_ref, data, **kwargs) for actor in actors]
Expand Down

0 comments on commit cad8c3e

Please sign in to comment.