Skip to content

Commit

Permalink
Compatibility with XGBoost 2.0 (#296)
Browse files Browse the repository at this point in the history
Updates in tests and API calls for compatibility with XGBoost 2.0.
  • Loading branch information
krfricke authored Sep 20, 2023
1 parent bd7e799 commit c7eada3
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 253 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"distributed computing framework Ray.",
url="https://github.com/ray-project/xgboost_ray",
install_requires=[
"ray>=2.0",
"ray>=2.7",
"numpy>=1.16",
"pandas",
"wrapt>=1.12.1",
Expand Down
22 changes: 18 additions & 4 deletions xgboost_ray/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class EarlyStopException(XGBoostError):
pass


# From xgboost>=1.7.0, rabit is replaced by a collective communicator
# From xgboost>=1.7.0, rabit is replaced by a collective communicator.
try:
from xgboost.collective import CommunicatorContext

Expand Down Expand Up @@ -53,7 +53,10 @@ class EarlyStopException(XGBoostError):
remove_placement_group,
)
from ray.util.queue import Queue
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
PlacementGroupSchedulingStrategy,
)

from xgboost_ray.util import Event, MultiActorTask, force_on_current_node

Expand Down Expand Up @@ -136,6 +139,10 @@ class _XGBoostEnv:
# when new actors become available
ELASTIC_RESTART_GRACE_PERIOD_S: int = 10

# Whether to allow soft-placement of communication processes. If True,
# the Queue and Event actors may be scheduled on non-driver nodes.
COMMUNICATION_SOFT_PLACEMENT: bool = True

def __getattribute__(self, item):
old_val = super(_XGBoostEnv, self).__getattribute__(item)
new_val = _get_environ(item, old_val)
Expand Down Expand Up @@ -985,8 +992,15 @@ def _create_communication_processes(added_tune_callback: bool = False):
else:
# Create Queue and Event actors and make sure to colocate with
# driver node.
node_ip = get_node_ip_address()
placement_option.update({"resources": {f"node:{node_ip}": 0.01}})
node_id = ray.get_runtime_context().get_node_id()
placement_option.update(
{
"scheduling_strategy": NodeAffinitySchedulingStrategy(
node_id=node_id,
soft=ENV.COMMUNICATION_SOFT_PLACEMENT,
)
}
)
queue = Queue(actor_options=placement_option) # Queue actor
stop_event = Event(actor_options=placement_option) # Stop event actor
return queue, stop_event
Expand Down
152 changes: 16 additions & 136 deletions xgboost_ray/sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ def validate_or_none(meta: Optional[List], name: str) -> List:
return train_dmatrix, evals


try:
from xgboost.sklearn import _convert_ntree_limit
except ImportError:
_convert_ntree_limit = None

try:
from xgboost.sklearn import _cls_predict_proba
except ImportError:
Expand All @@ -195,10 +190,6 @@ def _cls_predict_proba(n_classes: int, prediction, vstack: Callable):
_is_cudf_ser = None
_is_cupy_array = None

try:
from xgboost.compat import XGBoostLabelEncoder
except ImportError:
from sklearn.preprocessing import LabelEncoder as XGBoostLabelEncoder

_RAY_PARAMS_DOC = """ray_params : None or RayParams or Dict
Parameters to configure Ray-specific behavior.
Expand Down Expand Up @@ -367,27 +358,15 @@ def _ray_predict(
self: "XGBModel",
X,
output_margin=False,
ntree_limit=None,
validate_features=True,
base_margin=None,
iteration_range=None,
ray_params: Union[None, RayParams, Dict] = None,
_remote: Optional[bool] = None,
ray_dmatrix_params: Optional[Dict] = None,
**kwargs,
):
"""Distributed predict via Ray"""
compat_predict_kwargs = {}
if _convert_ntree_limit is not None:
iteration_range = _convert_ntree_limit(
self.get_booster(), ntree_limit, iteration_range
)
iteration_range = self._get_iteration_range(iteration_range)
compat_predict_kwargs["iteration_range"] = iteration_range
else:
if ntree_limit is None:
ntree_limit = getattr(self, "best_ntree_limit", 0)
compat_predict_kwargs["ntree_limit"] = ntree_limit

ray_params = self._ray_set_ray_params_n_jobs(ray_params, self.n_jobs)
ray_dmatrix_params = ray_dmatrix_params or {}

Expand All @@ -407,7 +386,7 @@ def _ray_predict(
validate_features=validate_features,
ray_params=ray_params,
_remote=_remote,
**compat_predict_kwargs,
**kwargs,
)

def _ray_get_wrap_evaluation_matrices_compat_kwargs(
Expand Down Expand Up @@ -589,24 +568,24 @@ def predict(
self,
X,
output_margin=False,
ntree_limit=None,
validate_features=True,
base_margin=None,
iteration_range=None,
ray_params: Union[None, RayParams, Dict] = None,
_remote: Optional[bool] = None,
ray_dmatrix_params: Optional[Dict] = None,
**kwargs,
):
return self._ray_predict(
X,
output_margin=output_margin,
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range,
ray_params=ray_params,
_remote=_remote,
ray_dmatrix_params=ray_dmatrix_params,
**kwargs,
)

predict.__doc__ = _treat_X_doc(_get_doc(XGBRegressor.predict)) + _RAY_PARAMS_DOC
Expand Down Expand Up @@ -702,22 +681,13 @@ def fit(
)

if train_dmatrix is not None:
if not hasattr(self, "use_label_encoder"):
warnings.warn(
"If X is a RayDMatrix, no label encoding"
" will be performed. Ensure the labels are"
" encoded."
)
elif self.use_label_encoder:
raise ValueError(
"X cannot be a RayDMatrix if `use_label_encoder` " "is set to True"
)
if "num_class" not in params:
raise ValueError(
"`num_class` must be set during initalization if X"
" is a RayDMatrix"
)
self.classes_ = list(range(0, params["num_class"]))
if XGBOOST_VERSION < Version("2.0.0"):
self.classes_ = list(range(0, params["num_class"]))
self.n_classes_ = params["num_class"]
if self.n_classes_ <= 2:
params.pop("num_class")
Expand All @@ -730,7 +700,10 @@ def fit(
"Please reshape the input data X into 2-dimensional " "matrix."
)

label_transform = self._ray_fit_preprocess(y)
label_transform = lambda x: x # noqa: E731
if XGBOOST_VERSION < Version("2.0.0"):
self.classes_ = np.unique(y)
self.n_classes_ = len(np.unique(y))

if callable(self.objective):
obj = _objective_decorator(self.objective)
Expand Down Expand Up @@ -819,99 +792,31 @@ def fit(

fit.__doc__ = _treat_X_doc(_get_doc(XGBClassifier.fit)) + _RAY_PARAMS_DOC

def _ray_fit_preprocess(self, y) -> Callable:
"""This has been separated out so that it can be easily overwritten
should a future xgboost version remove label encoding"""
# pylint: disable = attribute-defined-outside-init,too-many-statements
can_use_label_encoder = True
use_label_encoder = getattr(self, "use_label_encoder", True)
label_encoding_check_error = (
"The label must consist of integer "
"labels of form 0, 1, 2, ..., [num_class - 1]."
)
label_encoder_deprecation_msg = (
"The use of label encoder in XGBClassifier is deprecated and will "
"be removed in a future release. To remove this warning, do the "
"following: 1) Pass option use_label_encoder=False when "
"constructing XGBClassifier object; and 2) Encode your labels (y) "
"as integers starting with 0, i.e. 0, 1, 2, ..., [num_class - 1]."
)

# ray: modified this to allow for compatibility with legacy xgboost
if (_is_cudf_df and _is_cudf_df(y)) or (_is_cudf_ser and _is_cudf_ser(y)):
import cupy as cp # pylint: disable=E0401

self.classes_ = cp.unique(y.values)
self.n_classes_ = len(self.classes_)
can_use_label_encoder = False
expected_classes = cp.arange(self.n_classes_)
if (
self.classes_.shape != expected_classes.shape
or not (self.classes_ == expected_classes).all()
):
raise ValueError(label_encoding_check_error)
elif _is_cupy_array and _is_cupy_array(y):
import cupy as cp # pylint: disable=E0401

self.classes_ = cp.unique(y)
self.n_classes_ = len(self.classes_)
can_use_label_encoder = False
expected_classes = cp.arange(self.n_classes_)
if (
self.classes_.shape != expected_classes.shape
or not (self.classes_ == expected_classes).all()
):
raise ValueError(label_encoding_check_error)
else:
self.classes_ = np.unique(y)
self.n_classes_ = len(self.classes_)
if not use_label_encoder and (
not np.array_equal(self.classes_, np.arange(self.n_classes_))
):
raise ValueError(label_encoding_check_error)

if use_label_encoder:
if not can_use_label_encoder:
raise ValueError(
"The option use_label_encoder=True is incompatible with "
"inputs of type cuDF or cuPy. Please set "
"use_label_encoder=False when constructing XGBClassifier "
"object. NOTE:" + label_encoder_deprecation_msg
)
if hasattr(self, "use_label_encoder"):
warnings.warn(label_encoder_deprecation_msg, UserWarning)
self._le = XGBoostLabelEncoder().fit(y)
label_transform = self._le.transform
else:
label_transform = lambda x: x # noqa: E731

return label_transform

def _can_use_inplace_predict(self) -> bool:
return False

def predict(
self,
X,
output_margin=False,
ntree_limit=None,
validate_features=True,
base_margin=None,
iteration_range: Optional[Tuple[int, int]] = None,
ray_params: Union[None, RayParams, Dict] = None,
_remote: Optional[bool] = None,
ray_dmatrix_params: Optional[Dict] = None,
**kwargs,
):
class_probs = self._ray_predict(
X=X,
output_margin=output_margin,
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range,
ray_params=ray_params,
_remote=_remote,
ray_dmatrix_params=ray_dmatrix_params,
**kwargs,
)
if output_margin:
# If output_margin is active, simply return the scores
Expand All @@ -934,25 +839,25 @@ def predict(
def predict_proba(
self,
X,
ntree_limit=None,
validate_features=False,
base_margin=None,
iteration_range: Optional[Tuple[int, int]] = None,
ray_params: Union[None, RayParams, Dict] = None,
_remote: Optional[bool] = None,
ray_dmatrix_params: Optional[Dict] = None,
**kwargs,
) -> np.ndarray:

class_probs = self._ray_predict(
X=X,
output_margin=self.objective == "multi:softmax",
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range,
ray_params=ray_params,
_remote=_remote,
ray_dmatrix_params=ray_dmatrix_params,
**kwargs,
)
# If model is loaded from a raw booster there's no `n_classes_`
return _cls_predict_proba(
Expand All @@ -979,31 +884,6 @@ class RayXGBRFClassifier(RayXGBClassifier):
def __init__(self, *args, **kwargs):
raise ValueError("RayXGBRFClassifier not available with xgboost<1.0.0")

# use_label_encoder added in xgboost commit
# c8ec62103a36f1717d032b1ddff2bf9e0642508a (1.3.0)
elif "use_label_encoder" in inspect.signature(XGBRFClassifier.__init__).parameters:

@_deprecate_positional_args
@_xgboost_version_warn
def __init__(
self,
*,
learning_rate=1,
subsample=0.8,
colsample_bynode=0.8,
reg_lambda=1e-5,
use_label_encoder=True,
**kwargs,
):
super().__init__(
learning_rate=learning_rate,
subsample=subsample,
colsample_bynode=colsample_bynode,
reg_lambda=reg_lambda,
use_label_encoder=use_label_encoder,
**kwargs,
)

else:

@_deprecate_positional_args
Expand Down Expand Up @@ -1172,24 +1052,24 @@ def predict(
self,
X,
output_margin=False,
ntree_limit=None,
validate_features=True,
base_margin=None,
iteration_range=None,
ray_params: Union[None, RayParams, Dict] = None,
_remote: Optional[bool] = None,
ray_dmatrix_params: Optional[Dict] = None,
**kwargs,
):
return self._ray_predict(
X,
output_margin=output_margin,
ntree_limit=ntree_limit,
validate_features=validate_features,
base_margin=base_margin,
iteration_range=iteration_range,
ray_params=ray_params,
_remote=_remote,
ray_dmatrix_params=ray_dmatrix_params,
**kwargs,
)

predict.__doc__ = _treat_X_doc(_get_doc(XGBRanker.predict)) + _RAY_PARAMS_DOC
Expand Down
Loading

0 comments on commit c7eada3

Please sign in to comment.