Skip to content

Commit

Permalink
[Dask] Sort and partition for ranking. (#11007)
Browse files Browse the repository at this point in the history
- Implement automatic local sort.
- Implement partitioning by query ID.
- Document for distributed ranking.
  • Loading branch information
trivialfis authored Dec 4, 2024
1 parent 544a52e commit bb2e701
Show file tree
Hide file tree
Showing 17 changed files with 699 additions and 57 deletions.
201 changes: 201 additions & 0 deletions demo/dask/dask_learning_to_rank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
Learning to rank with the Dask Interface
========================================
.. versionadded:: 3.0.0
This is a demonstration of using XGBoost for learning to rank tasks using the
MSLR_10k_letor dataset. For more infomation about the dataset, please visit its
`description page <https://www.microsoft.com/en-us/research/project/mslr/>`_.
See :ref:`ltr-dist` for a general description for distributed learning to rank and
:ref:`ltr-dask` for Dask-specific features.
"""

from __future__ import annotations

import argparse
import os
from contextlib import contextmanager
from typing import Generator

import dask
import numpy as np
from dask import dataframe as dd
from distributed import Client, LocalCluster, wait
from sklearn.datasets import load_svmlight_file

from xgboost import dask as dxgb


def load_mslr_10k(
device: str, data_path: str, cache_path: str
) -> tuple[dd.DataFrame, dd.DataFrame, dd.DataFrame]:
"""Load the MSLR10k dataset from data_path and save parquet files in the cache_path."""
root_path = os.path.expanduser(args.data)
cache_path = os.path.expanduser(args.cache)

# Use only the Fold1 for demo:
# Train, Valid, Test
# {S1,S2,S3}, S4, S5
fold = 1

if not os.path.exists(cache_path):
os.mkdir(cache_path)
fold_path = os.path.join(root_path, f"Fold{fold}")
train_path = os.path.join(fold_path, "train.txt")
valid_path = os.path.join(fold_path, "vali.txt")
test_path = os.path.join(fold_path, "test.txt")

X_train, y_train, qid_train = load_svmlight_file(
train_path, query_id=True, dtype=np.float32
)
columns = [f"f{i}" for i in range(X_train.shape[1])]
X_train = dd.from_array(X_train.toarray(), columns=columns)
y_train = y_train.astype(np.int32)
qid_train = qid_train.astype(np.int32)

X_train["y"] = dd.from_array(y_train)
X_train["qid"] = dd.from_array(qid_train)
X_train.to_parquet(os.path.join(cache_path, "train"), engine="pyarrow")

X_valid, y_valid, qid_valid = load_svmlight_file(
valid_path, query_id=True, dtype=np.float32
)
X_valid = dd.from_array(X_valid.toarray(), columns=columns)
y_valid = y_valid.astype(np.int32)
qid_valid = qid_valid.astype(np.int32)

X_valid["y"] = dd.from_array(y_valid)
X_valid["qid"] = dd.from_array(qid_valid)
X_valid.to_parquet(os.path.join(cache_path, "valid"), engine="pyarrow")

X_test, y_test, qid_test = load_svmlight_file(
test_path, query_id=True, dtype=np.float32
)

X_test = dd.from_array(X_test.toarray(), columns=columns)
y_test = y_test.astype(np.int32)
qid_test = qid_test.astype(np.int32)

X_test["y"] = dd.from_array(y_test)
X_test["qid"] = dd.from_array(qid_test)
X_test.to_parquet(os.path.join(cache_path, "test"), engine="pyarrow")

df_train = dd.read_parquet(
os.path.join(cache_path, "train"), calculate_divisions=True
)
df_valid = dd.read_parquet(
os.path.join(cache_path, "valid"), calculate_divisions=True
)
df_test = dd.read_parquet(
os.path.join(cache_path, "test"), calculate_divisions=True
)

return df_train, df_valid, df_test


def ranking_demo(client: Client, args: argparse.Namespace) -> None:
"""Learning to rank with data sorted locally."""
df_tr, df_va, _ = load_mslr_10k(args.device, args.data, args.cache)

X_train: dd.DataFrame = df_tr[df_tr.columns.difference(["y", "qid"])]
y_train = df_tr[["y", "qid"]]
Xy_train = dxgb.DaskQuantileDMatrix(client, X_train, y_train.y, qid=y_train.qid)

X_valid: dd.DataFrame = df_va[df_va.columns.difference(["y", "qid"])]
y_valid = df_va[["y", "qid"]]
Xy_valid = dxgb.DaskQuantileDMatrix(
client, X_valid, y_valid.y, qid=y_valid.qid, ref=Xy_train
)
# Upon training, you will see a performance warning about sorting data based on
# query groups.
dxgb.train(
client,
{"objective": "rank:ndcg", "device": args.device},
Xy_train,
evals=[(Xy_train, "Train"), (Xy_valid, "Valid")],
num_boost_round=100,
)


def ranking_wo_split_demo(client: Client, args: argparse.Namespace) -> None:
"""Learning to rank with data partitioned according to query groups."""
df_tr, df_va, df_te = load_mslr_10k(args.device, args.data, args.cache)

X_tr = df_tr[df_tr.columns.difference(["y", "qid"])]
X_va = df_va[df_va.columns.difference(["y", "qid"])]

# `allow_group_split=False` makes sure data is partitioned according to the query
# groups.
ltr = dxgb.DaskXGBRanker(allow_group_split=False, device=args.device)
ltr.client = client
ltr = ltr.fit(
X_tr,
df_tr.y,
qid=df_tr.qid,
eval_set=[(X_tr, df_tr.y), (X_va, df_va.y)],
eval_qid=[df_tr.qid, df_va.qid],
verbose=True,
)

df_te = df_te.persist()
wait([df_te])

X_te = df_te[df_te.columns.difference(["y", "qid"])]
predt = ltr.predict(X_te)
y = client.compute(df_te.y)
wait([predt, y])


@contextmanager
def gen_client(device: str) -> Generator[Client, None, None]:
match device:
case "cuda":
from dask_cuda import LocalCUDACluster

with LocalCUDACluster() as cluster:
with Client(cluster) as client:
with dask.config.set(
{
"array.backend": "cupy",
"dataframe.backend": "cudf",
}
):
yield client
case "cpu":
with LocalCluster() as cluster:
with Client(cluster) as client:
yield client


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Demonstration of learning to rank using XGBoost."
)
parser.add_argument(
"--data",
type=str,
help="Root directory of the MSLR-WEB10K data.",
required=True,
)
parser.add_argument(
"--cache",
type=str,
help="Directory for caching processed data.",
required=True,
)
parser.add_argument("--device", choices=["cpu", "cuda"], default="cpu")
parser.add_argument(
"--no-split",
action="store_true",
help="Flag to indicate query groups should not be split.",
)
args = parser.parse_args()

with gen_client(args.device) as client:
if args.no_split:
ranking_wo_split_demo(client, args)
else:
ranking_demo(client, args)
10 changes: 5 additions & 5 deletions demo/guide-python/learning_to_rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
train on relevance degree, and the second part simulates click data and enable the
position debiasing training.
For an overview of learning to rank in XGBoost, please see
:doc:`Learning to Rank </tutorials/learning_to_rank>`.
For an overview of learning to rank in XGBoost, please see :doc:`Learning to Rank
</tutorials/learning_to_rank>`.
"""

from __future__ import annotations
Expand All @@ -31,7 +31,7 @@
from xgboost.testing.data import RelDataCV, simulate_clicks, sort_ltr_samples


def load_mlsr_10k(data_path: str, cache_path: str) -> RelDataCV:
def load_mslr_10k(data_path: str, cache_path: str) -> RelDataCV:
"""Load the MSLR10k dataset from data_path and cache a pickle object in cache_path.
Returns
Expand Down Expand Up @@ -89,7 +89,7 @@ def load_mlsr_10k(data_path: str, cache_path: str) -> RelDataCV:

def ranking_demo(args: argparse.Namespace) -> None:
"""Demonstration for learning to rank with relevance degree."""
data = load_mlsr_10k(args.data, args.cache)
data = load_mslr_10k(args.data, args.cache)

# Sort data according to query index
X_train, y_train, qid_train = data.train
Expand Down Expand Up @@ -123,7 +123,7 @@ def ranking_demo(args: argparse.Namespace) -> None:

def click_data_demo(args: argparse.Namespace) -> None:
"""Demonstration for learning to rank with click data."""
data = load_mlsr_10k(args.data, args.cache)
data = load_mslr_10k(args.data, args.cache)
train, test = simulate_clicks(data)
assert test is not None

Expand Down
62 changes: 53 additions & 9 deletions doc/tutorials/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,18 @@ Working with asyncio

.. versionadded:: 1.2.0

XGBoost's dask interface supports the new ``asyncio`` in Python and can be integrated into
asynchronous workflows. For using dask with asynchronous operations, please refer to
`this dask example <https://examples.dask.org/applications/async-await.html>`_ and document in
`distributed <https://distributed.dask.org/en/latest/asynchronous.html>`_. To use XGBoost's
dask interface asynchronously, the ``client`` which is passed as an argument for training and
prediction must be operating in asynchronous mode by specifying ``asynchronous=True`` when the
``client`` is created (example below). All functions (including ``DaskDMatrix``) provided
by the functional interface will then return coroutines which can then be awaited to retrieve
their result.
XGBoost's dask interface supports the new :py:mod:`asyncio` in Python and can be
integrated into asynchronous workflows. For using dask with asynchronous operations,
please refer to `this dask example
<https://examples.dask.org/applications/async-await.html>`_ and document in `distributed
<https://distributed.dask.org/en/latest/asynchronous.html>`_. To use XGBoost's Dask
interface asynchronously, the ``client`` which is passed as an argument for training and
prediction must be operating in asynchronous mode by specifying ``asynchronous=True`` when
the ``client`` is created (example below). All functions (including ``DaskDMatrix``)
provided by the functional interface will then return coroutines which can then be awaited
to retrieve their result. Please note that XGBoost is a compute-bounded application, where
parallelism is more important than concurrency. The support for `asyncio` is more about
compatibility instead of performance gain.

Functional interface:

Expand Down Expand Up @@ -526,6 +529,47 @@ See https://github.com/coiled/dask-xgboost-nyctaxi for a set of examples of usin
with dask and optuna.


.. _ltr-dask:

****************
Learning to Rank
****************

.. versionadded:: 3.0.0

.. note::

Position debiasing is not yet supported.

There are two operation modes in the Dask learning to rank for performance reasons. The
difference is whether a distributed global sort is needed. Please see :ref:`ltr-dist` for
how ranking works with distributed training in general. Below we will discuss some of the
Dask-specific features.

First, if you use the :py:class:`~xgboost.dask.DaskQuantileDMatrix` interface or the
:py:class:`~xgboost.dask.DaskXGBRanker` with ``allow_group_split`` set to ``True``,
XGBoost will try to sort and group the samples for each worker based on the query ID. This
mode tries to skip the global sort and sort only worker-local data, and hence no
inter-worker data shuffle. Please note that even worker-local sort is costly, particularly
in terms of memory usage as there's no spilling when
:py:meth:`~pandas.DataFrame.sort_values` is used, and we need to concatenate the
data. XGBoost first checks whether the QID is already sorted before actually performing
the sorting operation. One can choose this if the query groups are relatively consecutive,
meaning most of the samples within a query group are close to each other and are likely to
be resided to the same worker. Don't use this if you have performed a random shuffle on
your data.

If the input data is random, then there's no way we can guarantee most of data within the
same group being in the same worker. For large query groups, this might not be an
issue. But for small query groups, it's possible that each worker gets only one or two
samples from their group for all groups, which can lead to disastrous performance. In that
case, we can partition the data according to query group, which is the default behavior of
the :py:class:`~xgboost.dask.DaskXGBRanker` unless the ``allow_group_split`` is set to
``True``. This mode performs a sort and a groupby on the entire dataset in addition to an
encoding operation for the query group IDs. Along with partition fragmentation, this
option can lead to slow performance. See
:ref:`sphx_glr_python_dask-examples_dask_learning_to_rank.py` for a worked example.

.. _tracker-ip:

***************
Expand Down
18 changes: 17 additions & 1 deletion doc/tutorials/learning_to_rank.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,26 @@ On the other hand, if you have comparatively small amount of training data:

For any method chosen, you can modify ``lambdarank_num_pair_per_sample`` to control the amount of pairs generated.

.. _ltr-dist:

********************
Distributed Training
********************
XGBoost implements distributed learning-to-rank with integration of multiple frameworks including Dask, Spark, and PySpark. The interface is similar to the single-node counterpart. Please refer to document of the respective XGBoost interface for details. Scattering a query group onto multiple workers is theoretically sound but can affect the model accuracy. For most of the use cases, the small discrepancy is not an issue, as the amount of training data is usually large when distributed training is used. As a result, users don't need to partition the data based on query groups. As long as each data partition is correctly sorted by query IDs, XGBoost can aggregate sample gradients accordingly.

XGBoost implements distributed learning-to-rank with integration of multiple frameworks
including :doc:`Dask </tutorials/dask>`, :doc:`Spark </jvm/xgboost4j_spark_tutorial>`, and
:doc:`PySpark </tutorials/spark_estimator>`. The interface is similar to the single-node
counterpart. Please refer to document of the respective XGBoost interface for details.

.. warning::

Position-debiasing is not yet supported for existing distributed interfaces.

XGBoost works with collective operations, which means data is scattered to multiple workers. We can divide the data partitions by query group and ensure no query group is split among workers. However, this requires a costly sort and groupby operation and might only be necessary for selected use cases. Splitting and scattering a query group to multiple workers is theoretically sound but can affect the model's accuracy. If there are only a small number of groups sitting at the boundaries of workers, the small discrepancy is not an issue, as the amount of training data is usually large when distributed training is used.

For a longer explanation, assuming the pairwise ranking method is used, we calculate the gradient based on relevance degree by constructing pairs within a query group. If a single query group is split among workers and we use worker-local data for gradient calculation, then we are simply sampling pairs from a smaller group for each worker to calculate the gradient and the evaluation metric. The comparison between each pair doesn't change because a group is split into sub-groups, what changes is the number of total and effective pairs and normalizers like `IDCG`. One can generate more pairs from a large group than it's from two smaller subgroups. As a result, the obtained gradient is still valid from a theoretical standpoint but might not be optimal. As long as each data partitions within a worker are correctly sorted by query IDs, XGBoost can aggregate sample gradients accordingly. And both the (Py)Spark interface and the Dask interface can sort the data according to query ID, please see respected tutorials for more information.

However, it's possible that a distributed framework shuffles the data during map reduce and splits every query group into multiple workers. In that case, the performance would be disastrous. As a result, it depends on the data and the framework for whether a sorted groupby is needed.

*******************
Reproducible Result
Expand Down
6 changes: 1 addition & 5 deletions python-package/xgboost/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3215,11 +3215,7 @@ def trees_to_dataframe(self, fmap: PathLike = "") -> DataFrame:
}
)

if callable(getattr(df, "sort_values", None)):
# pylint: disable=no-member
return df.sort_values(["Tree", "Node"]).reset_index(drop=True)
# pylint: disable=no-member
return df.sort(["Tree", "Node"]).reset_index(drop=True)
return df.sort_values(["Tree", "Node"]).reset_index(drop=True)

def _assign_dmatrix_features(self, data: DMatrix) -> None:
if data.num_row() == 0:
Expand Down
Loading

0 comments on commit bb2e701

Please sign in to comment.