Skip to content

Commit

Permalink
Start factoring out chunked reader
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke committed Nov 6, 2024
1 parent e8d2615 commit 22ba400
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 81 deletions.
113 changes: 58 additions & 55 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2019-2024, NVIDIA CORPORATION.

import io
import itertools

import pyarrow as pa

Expand Down Expand Up @@ -45,7 +46,6 @@ from pylibcudf.libcudf.io.parquet_metadata cimport (
from pylibcudf.libcudf.io.types cimport (
source_info,
sink_info,
column_in_metadata,
table_input_metadata,
partition_info,
statistics_freq,
Expand Down Expand Up @@ -424,44 +424,45 @@ def write_parquet(
"""

# Create the write options
cdef table_input_metadata tbl_meta

cdef table_view tv
cdef vector[unique_ptr[data_sink]] _data_sinks
cdef sink_info sink = make_sinks_info(
filepaths_or_buffers, _data_sinks
)
sink = plc.io.SinkInfo(filepaths_or_buffers)

if index is True or (
index is None and not isinstance(table._index, cudf.RangeIndex)
):
tv = table_view_from_table(table)
tbl_meta = table_input_metadata(tv)
for level, idx_name in enumerate(table._index.names):
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
tbl_meta = plc.io.types.TableInputMetadata(plc_table)
for level, idx_name in enumerate(table.index.names):
tbl_meta.column_metadata[level].set_name(
str.encode(
_index_level_name(idx_name, level, table._column_names)
)
_index_level_name(idx_name, level, table._column_names)
)
num_index_cols_meta = len(table._index.names)
num_index_cols_meta = level
else:
tv = table_view_from_table(table, ignore_index=True)
tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[col.to_pylibcudf(mode="read") for col in table._columns]
)
tbl_meta = plc.io.types.TableInputMetadata(plc_table)
num_index_cols_meta = 0

for i, name in enumerate(table._column_names, num_index_cols_meta):
for i, (name, col) in enumerate(
table._column_labels_and_values, start=num_index_cols_meta
):
if not isinstance(name, str):
if cudf.get_option("mode.pandas_compatible"):
tbl_meta.column_metadata[i].set_name(str(name).encode())
tbl_meta.column_metadata[i].set_name(str(name))
else:
raise ValueError(
"Writing a Parquet file requires string column names"
)
else:
tbl_meta.column_metadata[i].set_name(name.encode())
tbl_meta.column_metadata[i].set_name(name)

_set_col_metadata(
table[name]._column,
col,
tbl_meta.column_metadata[i],
force_nullable_schema,
None,
Expand Down Expand Up @@ -490,11 +491,9 @@ def write_parquet(
"Valid values are '1.0' and '2.0'"
)

cdef vector[partition_info] partitions

# Perform write
options = (
plc.io.parquet.ParquetWriterOptions.builder(sink, tv)
plc.io.parquet.ParquetWriterOptions.builder(sink, plc_table)
.metadata(tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(compression))
Expand All @@ -511,12 +510,12 @@ def write_parquet(
.build()
)
if partitions_info is not None:
partitions = [
plc.io.types.PartitionInfo(start, num)
for (start, num) in partitions_info
]
partitions.reserve(len(partitions_info))
for part in partitions_info:
partitions.push_back(
partition_info(part[0], part[1])
)
options.set_partitions(move(partitions))
options.set_partitions(partitions)
if metadata_file_path is not None:
if isinstance(metadata_file_path, str):
options.set_column_chunks_file_paths([metadata_file_path])
Expand Down Expand Up @@ -696,34 +695,38 @@ cdef class ParquetWriter:
def _initialize_chunked_state(self, table, num_partitions=1):
""" Prepares all the values required to build the
chunked_parquet_writer_options and creates a writer"""
cdef table_view tv

# Set the table_metadata
num_index_cols_meta = 0
self.tbl_meta = table_input_metadata(
table_view_from_table(table, ignore_index=True))
if self.index is not False:
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
for level, idx_name in enumerate(table._index.names):
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read") for col in
itertools.chain(table.index._columns, table._columns)
]
)
self.tbl_meta = plc.io.types.TableInputMetadata(plc_table)
if isinstance(table.index, cudf.core.multiindex.MultiIndex):
for level, idx_name in enumerate(table.index.names):
self.tbl_meta.column_metadata[level].set_name(
(str.encode(idx_name))
)
num_index_cols_meta = len(table._index.names)
else:
if table._index.name is not None:
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
self.tbl_meta.column_metadata[0].set_name(
str.encode(table._index.name)
idx_name
)
num_index_cols_meta = 1
num_index_cols_meta = level
elif table.index.name is not None:
self.tbl_meta.column_metadata[0].set_name(
table.index.name
)
num_index_cols_meta = 1
else:
self.tbl_meta = table_input_metadata(
table_view_from_table(table, ignore_index=True)
)
num_index_cols_meta = 0

for i, name in enumerate(table._column_names, num_index_cols_meta):
self.tbl_meta.column_metadata[i].set_name(name.encode())
for i, (name, col) in enumerate(
table._column_labels_and_values, start=num_index_cols_meta
):
self.tbl_meta.column_metadata[i].set_name(name)
_set_col_metadata(
table[name]._column,
col,
self.tbl_meta.column_metadata[i],
)

Expand Down Expand Up @@ -815,7 +818,7 @@ cdef compression_type _get_comp_type(object compression):

cdef _set_col_metadata(
Column col,
column_in_metadata& col_meta,
plc.io.types.ColumnInMetadata col_meta,
bool force_nullable_schema=False,
str path=None,
object skip_compression=None,
Expand All @@ -825,7 +828,7 @@ cdef _set_col_metadata(
):
need_path = (skip_compression is not None or column_encoding is not None or
column_type_length is not None or output_as_binary is not None)
name = col_meta.get_name().decode('UTF-8') if need_path else None
name = col_meta.get_name() if need_path else None
full_path = path + "." + name if path is not None else name

if force_nullable_schema:
Expand Down Expand Up @@ -858,7 +861,7 @@ cdef _set_col_metadata(
for i, (child_col, name) in enumerate(
zip(col.children, list(col.dtype.fields))
):
col_meta.child(i).set_name(name.encode())
col_meta.child(i).set_name(name)
_set_col_metadata(
child_col,
col_meta.child(i),
Expand All @@ -872,7 +875,7 @@ cdef _set_col_metadata(
elif isinstance(col.dtype, cudf.ListDtype):
if full_path is not None:
full_path = full_path + ".list"
col_meta.child(1).set_name("element".encode())
col_meta.child(1).set_name("element")
_set_col_metadata(
col.children[1],
col_meta.child(1),
Expand Down
2 changes: 1 addition & 1 deletion python/pylibcudf/pylibcudf/io/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ cdef class ParquetWriterOptions:
-------
None
"""
self.options.set_partitions(partitions)
self.options.set_partitions([partition.c_obj for partition in partitions])

cpdef void set_column_chunks_file_paths(list file_paths):
"""
Expand Down
9 changes: 4 additions & 5 deletions python/pylibcudf/pylibcudf/io/types.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ from pylibcudf.libcudf.io.types cimport (
from pylibcudf.table cimport Table
from pylibcudf.io.types cimport ColumnEncoding

cdef class PartitionInfo:
cdef partition_info c_obj

cdef class ColumnInMetadata:
cdef column_in_metadata metadata
cdef column_in_metadata c_obj

cpdef set_name(self, str name)

Expand Down Expand Up @@ -51,10 +53,7 @@ cdef class ColumnInMetadata:

cdef class TableInputMetadata:
cdef public Table table
cdef table_input_metadata metadata

@property
cpdef list column_metadata(self)
cdef table_input_metadata c_obj

cdef class TableWithMetadata:
cdef public Table tbl
Expand Down
Loading

0 comments on commit 22ba400

Please sign in to comment.