Skip to content

rapidsai/legate-dataframe

Legate-dataframe: a scalable dataframe library

A prototype of a legate-enabled version of libcudf. This is not a drop-in replacement of Pandas, instead it follows the more low-level API of libcudf.

In the future, we plan to introduce a high-level pure Python package that implements all the nice-to-have features known from Pandas using the low-level API's primitives.

Install

You can install legate-dataframe packages from the conda legate channel using

conda -c legate -c rapidsai -c conda-forge legate-dataframe

To include development releases add the legate/label/experimental channel.

Build

Legate-dataframe uses the Legate C++ API from Legate-core and cuPyNumeric. cuPyNumeric is only used in Python tests and examples so it isn't strictly necessary.

The current tested versions are legate and cuPyNumeric 24.11 release available from the conda legate channel.

Legate-dataframe

First we clone legate-dataframe and install the dependencies:

git clone https://github.com/rapidsai/legate-dataframe.git
cd legate-dataframe
mamba env update --name legate-dev --file conda/environments/all_cuda-124_arch-x86_64.yaml

Then we can build, install, and test the project:

./build.sh
./build.sh test

Feature Status

Feature Status Limitations
Copy to/from cuDF DataFrame
Parquet read & write
CSV read & write
Zero-copy to/from cuPyNumeric arrays
Hash based inner join
Hash based left join
Hash based full/outer join
GroupBy Aggregation Basic aggs. like SUM and NUNIQUE
Numeric data types
Datetime data types
String data types
Null masked columns

Example

Python

import tempfile
import cudf
import cupynumeric
from legate.core import get_legate_runtime
from legate_dataframe import LogicalColumn, LogicalTable
from legate_dataframe.lib.parquet import parquet_read, parquet_write

def main(tmpdir):
    # Let's start by creating a logical table from a cuDF dataframe
    # This takes a local dataframe and distribute it between Legate nodes
    df = cudf.DataFrame({"a": [1, 2, 3, 4], "b": [-1, -2, -3, -4]})
    tbl1 = LogicalTable.from_cudf(df)

    # We can write the logical table to disk using the Parquet file format.
    # The table is written into multiple files, one file per partition:
    #      /tmpdir/
    #          ├── part-0.parquet
    #          ├── part-1.parquet
    #          ├── part-2.parquet
    #          └── ...
    parquet_write(tbl1, path=tmpdir)

    # NB: since Legate execute tasks lazily, we issue a blocking fence
    #     in order to wait until all files has been written to disk.
    get_legate_runtime().issue_execution_fence(block=True)

    # Then we can read the parquet files back into a logical table. We
    # provide a Glob string that reference all the parquet files that
    # should go into the logical table.
    tbl2 = parquet_read(glob_string=f"{tmpdir}/*.parquet")

    # LogicalColumn implements the `__legate_data_interface__` interface,
    # which makes it possible for other Legate libraries, such as cuPyNumeric,
    # to operate on columns seamlessly.
    ary = cupynumeric.add(tbl1["a"], tbl2["b"])
    assert ary.sum() == 0
    ary[:] = [4, 3, 2, 1]

    # We can create a new logical column from any 1-D array like object that
    # exposes the `__legate_data_interface__` interface.
    col = LogicalColumn(ary)

    # We can create a new logical table from existing logical columns.
    LogicalTable(columns=(col, tbl2["b"]), column_names=["a", "b"])

if __name__ == "__main__":
    with tempfile.TemporaryDirectory() as tmpdir:
        main(tmpdir)
        # Since Legate execute tasks lazily, we issue a blocking fence here
        # to make sure all task has finished before `tmpdir` is removed.
        get_legate_runtime().issue_execution_fence(block=True)

C++

#include <filesystem>
#include <legate.h>

#include <legate_dataframe/core/column.hpp>
#include <legate_dataframe/core/table.hpp>
#include <legate_dataframe/parquet.hpp>
#include <legate_dataframe/unaryop.hpp>

int main(int argc, char** argv)
{
  // First we initialize Legate and cuPyNumeric
  int32_t errcode = legate::start(argc, argv);
  if (errcode != 0) {
    throw std::runtime_error("legate::start() errorcode: " + std::to_string(errcode));
  }

  // Then let's create a new logical column
  legate::dataframe::LogicalColumn col_a = legate::dataframe::sequence(20, -10);

  // Compute the absolute value of each row in `col_a`
  legate::dataframe::LogicalColumn col_b = unary_operation(col_a, cudf::unary_operator::ABS);

  // Create a new logical table that contains the two existing columns (zero-copy)
  legate::dataframe::LogicalTable tbl_a{{col_a, col_a}};

  // We can write the logical table to disk using the Parquet file format.
  // The table is written into multiple files, one file per partition:
  //      /tmpdir/
  //          ├── part-0.parquet
  //          ├── part-1.parquet
  //          ├── part-2.parquet
  //          └── ...
  legate::dataframe::parquet_write(tbl_a, "./my_parquet_file");

  // NB: since Legate execute tasks lazily, we issue a blocking fence
  //     in order to wait until all files has been written to disk.
  legate::Runtime::get_runtime()->issue_execution_fence(true);

  // Then we can read the parquet files back into a logical table. We
  // provide a Glob string that reference all the parquet files that
  // should go into the logical table.
  auto tbl_b = legate::dataframe::parquet_read("./my_parquet_file/*.parquet");

  // Clean up
  std::filesystem::remove_all("./my_parquet_file");
  return 0;
}

Contributing

Please see our our guide and the developer guide.