Skip to content

Commit

Permalink
Add test of interoperability of cuDF and arrow BYTE_STREAM_SPLIT enco…
Browse files Browse the repository at this point in the history
…ders (#15832)

BYTE_STREAM_SPLIT encoding was recently added to cuDF (#15311). The Parquet specification was recently changed (apache/parquet-format#229) to extend the datatypes that can be encoded as BYTE_STREAM_SPLIT, and this was only recently implemented in arrow (apache/arrow#40094). This PR adds a check that cuDF and arrow can produce compatible files using BYTE_STREAM_SPLIT encoding.

Authors:
  - Ed Seidl (https://github.com/etseidl)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #15832
  • Loading branch information
etseidl authored Jun 24, 2024
1 parent ac3c8dd commit ed41668
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2947,6 +2947,61 @@ def test_per_column_options_string_col(tmpdir, encoding):
assert encoding in fmd.row_group(0).column(0).encodings


@pytest.mark.parametrize(
"num_rows",
[200, 10000],
)
def test_parquet_bss_round_trip(tmpdir, num_rows):
def flba(i):
hasher = hashlib.sha256()
hasher.update(i.to_bytes(4, "little"))
return hasher.digest()

# use pyarrow to write table of types that support BYTE_STREAM_SPLIT encoding
rows_per_rowgroup = 5000
fixed_data = pa.array(
[flba(i) for i in range(num_rows)], type=pa.binary(32)
)
i32_data = pa.array(list(range(num_rows)), type=pa.int32())
i64_data = pa.array(list(range(num_rows)), type=pa.int64())
f32_data = pa.array([float(i) for i in range(num_rows)], type=pa.float32())
f64_data = pa.array([float(i) for i in range(num_rows)], type=pa.float64())
padf = pa.Table.from_arrays(
[fixed_data, i32_data, i64_data, f32_data, f64_data],
names=["flba", "i32", "i64", "f32", "f64"],
)
padf_fname = tmpdir.join("padf.parquet")
pq.write_table(
padf,
padf_fname,
column_encoding="BYTE_STREAM_SPLIT",
use_dictionary=False,
row_group_size=rows_per_rowgroup,
)

# round trip data with cudf
cdf = cudf.read_parquet(padf_fname)
cdf_fname = tmpdir.join("cdf.parquet")
cdf.to_parquet(
cdf_fname,
column_type_length={"flba": 32},
column_encoding={
"flba": "BYTE_STREAM_SPLIT",
"i32": "BYTE_STREAM_SPLIT",
"i64": "BYTE_STREAM_SPLIT",
"f32": "BYTE_STREAM_SPLIT",
"f64": "BYTE_STREAM_SPLIT",
},
row_group_size_rows=rows_per_rowgroup,
)

# now read back in with pyarrow to test it was written properly by cudf
padf2 = pq.read_table(padf_fname)
padf3 = pq.read_table(cdf_fname)
assert_eq(padf2, padf3)
assert_eq(padf2.schema[0].type, padf3.schema[0].type)


def test_parquet_reader_rle_boolean(datadir):
fname = datadir / "rle_boolean_encoding.parquet"

Expand Down

0 comments on commit ed41668

Please sign in to comment.