Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate CSV writer to pylibcudf #17163

Open
wants to merge 10 commits into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 22 additions & 65 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport pylibcudf.libcudf.types as libcudf_types

Expand All @@ -23,23 +19,18 @@ from cudf.core.buffer import acquire_spill_lock

from libcpp cimport bool

from pylibcudf.libcudf.io.csv cimport (
csv_writer_options,
write_csv as cpp_write_csv,
)
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.types cimport compression_type, sink_info
from pylibcudf.libcudf.table.table_view cimport table_view
from pylibcudf.libcudf.io.types cimport compression_type

from cudf._lib.io.utils cimport make_sink_info
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

from cudf.api.types import is_hashable

from pylibcudf.types cimport DataType

from cudf._lib.json import _dtype_to_names_list

CSV_HEX_TYPE_MAP = {
"hex": np.dtype("int64"),
"hex64": np.dtype("int64"),
Expand Down Expand Up @@ -318,59 +309,25 @@ def write_csv(
--------
cudf.to_csv
"""
cdef table_view input_table_view = table_view_from_table(
table, not index
)
cdef bool include_header_c = header
cdef char delim_c = ord(sep)
cdef string line_term_c = lineterminator.encode()
cdef string na_c = na_rep.encode()
cdef int rows_per_chunk_c = rows_per_chunk
cdef vector[string] col_names
cdef string true_value_c = 'True'.encode()
cdef string false_value_c = 'False'.encode()
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)

if header is True:
all_names = columns_apply_na_rep(table._column_names, na_rep)
if index is True:
all_names = table._index.names + all_names

if len(all_names) > 0:
col_names.reserve(len(all_names))
if len(all_names) == 1:
if all_names[0] in (None, ''):
col_names.push_back('""'.encode())
else:
col_names.push_back(
str(all_names[0]).encode()
)
else:
for idx, col_name in enumerate(all_names):
if col_name is None:
col_names.push_back(''.encode())
else:
col_names.push_back(
str(col_name).encode()
)

cdef csv_writer_options options = move(
csv_writer_options.builder(sink_info_c, input_table_view)
.names(col_names)
.na_rep(na_c)
.include_header(include_header_c)
.rows_per_chunk(rows_per_chunk_c)
.line_terminator(line_term_c)
.inter_column_delimiter(delim_c)
.true_value(true_value_c)
.false_value(false_value_c)
.build()
)

col_names = []
for name in table._column_names:
col_names.append((name, _dtype_to_names_list(table[name]._column)))
try:
with nogil:
cpp_write_csv(options)
plc.io.csv.write_csv(
plc.io.TableWithMetadata(
plc.Table([
col.to_pylibcudf(mode="read") for col in table._columns
]),
col_names
),
path_or_buf=path_or_buf,
sep=sep,
na_rep=na_rep,
header=header,
lineterminator=lineterminator,
rows_per_chunk=rows_per_chunk,
index=index,
)
except OverflowError:
raise OverflowError(
f"Writing CSV file with chunksize={rows_per_chunk} failed. "
Expand Down
181 changes: 181 additions & 0 deletions python/pylibcudf/pylibcudf/io/csv.pyx
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from cpython.buffer cimport PyBUF_READ
from cpython.memoryview cimport PyMemoryView_FromMemory
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from pylibcudf.libcudf.io.csv cimport (
csv_reader_options,
csv_writer_options,
read_csv as cpp_read_csv,
write_csv as cpp_write_csv,
)
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.types cimport (
compression_type,
quote_style,
Expand All @@ -18,6 +24,13 @@ from pylibcudf.libcudf.io.types cimport (
from pylibcudf.libcudf.types cimport data_type, size_type
from pylibcudf.types cimport DataType

import io

from pylibcudf.libcudf.io.types cimport sink_info

import codecs
import os


cdef tuple _process_parse_dates_hex(list cols):
cdef vector[string] str_cols
Expand Down Expand Up @@ -80,6 +93,8 @@ def read_csv(
):
"""Reads a CSV file into a :py:class:`~.types.TableWithMetadata`.

For details, see :cpp:func:`read_csv`.

Parameters
----------
source_info : SourceInfo
Expand Down Expand Up @@ -261,3 +276,169 @@ def read_csv(
c_result = move(cpp_read_csv(options))

return TableWithMetadata.from_libcudf(c_result)


# Adapts a python io.IOBase object as a libcudf IO data_sink. This lets you
# write from cudf to any python file-like object (File/BytesIO/SocketIO etc)
cdef cppclass iobase_data_sink(data_sink):
object buf

iobase_data_sink(object buf_):
this.buf = buf_

void host_write(const void * data, size_t size) with gil:
if isinstance(buf, io.StringIO):
buf.write(PyMemoryView_FromMemory(<char*>data, size, PyBUF_READ)
.tobytes().decode())
else:
buf.write(PyMemoryView_FromMemory(<char*>data, size, PyBUF_READ))

void flush() with gil:
buf.flush()

size_t bytes_written() with gil:
return buf.tell()


# Converts the Python sink input to libcudf IO sink_info.
cdef sink_info make_sinks_info(
list src, vector[unique_ptr[data_sink]] & sink
) except*:
cdef vector[data_sink *] data_sinks
cdef vector[string] paths
if isinstance(src[0], io.StringIO):
data_sinks.reserve(len(src))
for s in src:
sink.push_back(unique_ptr[data_sink](new iobase_data_sink(s)))
data_sinks.push_back(sink.back().get())
return sink_info(data_sinks)
elif isinstance(src[0], io.TextIOBase):
data_sinks.reserve(len(src))
for s in src:
# Files opened in text mode expect writes to be str rather than
# bytes, which requires conversion from utf-8. If the underlying
# buffer is utf-8, we can bypass this conversion by writing
# directly to it.
if codecs.lookup(s.encoding).name not in {"utf-8", "ascii"}:
raise NotImplementedError(f"Unsupported encoding {s.encoding}")
sink.push_back(
unique_ptr[data_sink](new iobase_data_sink(s.buffer))
)
data_sinks.push_back(sink.back().get())
return sink_info(data_sinks)
elif isinstance(src[0], io.IOBase):
data_sinks.reserve(len(src))
for s in src:
sink.push_back(unique_ptr[data_sink](new iobase_data_sink(s)))
data_sinks.push_back(sink.back().get())
return sink_info(data_sinks)
elif isinstance(src[0], (basestring, os.PathLike)):
paths.reserve(len(src))
for s in src:
paths.push_back(<string> os.path.expanduser(s).encode())
return sink_info(move(paths))
else:
raise TypeError("Unrecognized input type: {}".format(type(src)))


cdef sink_info make_sink_info(src, unique_ptr[data_sink] & sink) except*:
cdef vector[unique_ptr[data_sink]] datasinks
cdef sink_info info = make_sinks_info([src], datasinks)
if not datasinks.empty():
sink.swap(datasinks[0])
return info


def columns_apply_na_rep(column_names, na_rep):
return tuple(
na_rep if str(col_name)=="<NA>"
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
else col_name
for col_name in column_names
)


def write_csv(
TableWithMetadata table,
*,
object path_or_buf=None,
str sep=",",
str na_rep="",
bool header=True,
str lineterminator="\n",
int rows_per_chunk=8,
bool index=True,
):
"""
Writes a :py:class:`~pylibcudf.io.types.TableWithMetadata` to CSV format.

For details, see :cpp:func:`write_csv`.

Parameters
----------
table : TableWithMetadata
The TableWithMetadata object containing the Table to write.
path_or_buf : object, default None
The source file-like object (eg. io.StringIO).
sep : str
Character to delimit column values.
na_rep : str
The string representation for null values.
header : bool, default True
Whether to write headers to csv. Includes the column names
and optionally, the index names (see ``index`` argument).
lineterminator : str, default '\\n'
The character used to determine the end of a line.
rows_per_chunk: int, default 8
The maximum number of rows to write at a time.
index : bool, default True
Whether to include index names in headers.
"""
cdef bool include_header_c = header
cdef char delim_c = ord(sep)
cdef string line_term_c = lineterminator.encode()
cdef string na_c = na_rep.encode()
cdef int rows_per_chunk_c = rows_per_chunk
cdef vector[string] col_names
cdef string true_value_c = 'True'.encode()
cdef string false_value_c = 'False'.encode()
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)

if header is True:
all_names = columns_apply_na_rep(table._column_names, na_rep)
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
if index is True:
all_names = table._index.names + all_names

if len(all_names) > 0:
col_names.reserve(len(all_names))
if len(all_names) == 1:
if all_names[0] in (None, ''):
col_names.push_back('""'.encode())
else:
col_names.push_back(
str(all_names[0]).encode()
)
else:
for idx, col_name in enumerate(all_names):
if col_name is None:
col_names.push_back(''.encode())
else:
col_names.push_back(
str(col_name).encode()
)

cdef csv_writer_options options = move(
csv_writer_options.builder(sink_info_c, table.tbl.view())
.names(col_names)
.na_rep(na_c)
.include_header(include_header_c)
.rows_per_chunk(rows_per_chunk_c)
.line_terminator(line_term_c)
.inter_column_delimiter(delim_c)
.true_value(true_value_c)
.false_value(false_value_c)
.build()
)

with nogil:
cpp_write_csv(options)
Loading