Skip to content

Commit

Permalink
delete _CPackedColumns
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 committed Nov 7, 2024
1 parent 3fee3fc commit 635595a
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 153 deletions.
8 changes: 0 additions & 8 deletions python/cudf/cudf/_lib/copying.pxd
Original file line number Diff line number Diff line change
@@ -1,9 +1 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION.

from pylibcudf.libcudf.contiguous_split cimport packed_columns

cdef class _CPackedColumns:
cdef packed_columns c_obj
cdef object column_names
cdef object column_dtypes
cdef object index_names
218 changes: 75 additions & 143 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@

import pickle

from libc.stdint cimport uint8_t, uintptr_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.utility cimport move
from libcpp.vector cimport vector

from rmm.pylibrmm.device_buffer cimport DeviceBuffer

import pylibcudf

Expand All @@ -20,21 +16,20 @@ from cudf._lib.column cimport Column
from cudf._lib.scalar import as_device_scalar

from cudf._lib.scalar cimport DeviceScalar
from cudf._lib.utils cimport table_view_from_table

from cudf._lib.reduce import minmax
from cudf.core.abc import Serializable

from libcpp.memory cimport make_unique

cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split
from pylibcudf.libcudf.column.column cimport column
from pylibcudf.libcudf.column.column_view cimport column_view
from pylibcudf.libcudf.scalar.scalar cimport scalar
from pylibcudf.libcudf.types cimport size_type

from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_table_view
cimport pylibcudf as plc
from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table
import pylibcudf as plc
from pylibcudf.contiguous_split import PackedColumns as PlcPackedColumns

# workaround for https://github.com/cython/cython/issues/3885
ctypedef const scalar constscalar
Expand Down Expand Up @@ -336,143 +331,17 @@ def get_element(Column input_column, size_type index):
)


cdef class _CPackedColumns:

@staticmethod
def from_py_table(input_table, keep_index=True):
"""
Construct a ``PackedColumns`` object from a ``cudf.DataFrame``.
"""
import cudf.core.dtypes

cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

if keep_index and (
not isinstance(input_table.index, cudf.RangeIndex)
or input_table.index.start != 0
or input_table.index.stop != len(input_table)
or input_table.index.step != 1
):
columns = input_table._index._columns + input_table._columns
p.index_names = input_table._index_names
else:
columns = input_table._columns

p.column_names = input_table._column_names
p.column_dtypes = {}
for name, col in input_table._column_labels_and_values:
if isinstance(col.dtype, cudf.core.dtypes._BaseDtype):
p.column_dtypes[name] = col.dtype

p.c_obj = move(plc.contiguous_split.pack(
plc.Table(
[
col.to_pylibcudf(mode="read") for col in columns
]
)
).c_obj.get()[0])

return p

@property
def gpu_data_ptr(self):
return int(<uintptr_t>self.c_obj.gpu_data.get()[0].data())

@property
def gpu_data_size(self):
return int(<size_t>self.c_obj.gpu_data.get()[0].size())

def serialize(self):
header = {}
frames = []

gpu_data = as_buffer(
data=self.gpu_data_ptr,
size=self.gpu_data_size,
owner=self,
exposed=True
)
data_header, data_frames = gpu_data.serialize()
header["data"] = data_header
frames.extend(data_frames)

header["column-names"] = self.column_names
header["index-names"] = self.index_names
if self.c_obj.metadata.get()[0].data() != NULL:
header["metadata"] = list(
<uint8_t[:self.c_obj.metadata.get()[0].size()]>
self.c_obj.metadata.get()[0].data()
)

column_dtypes = {}
for name, dtype in self.column_dtypes.items():
dtype_header, dtype_frames = dtype.serialize()
column_dtypes[name] = (
dtype_header,
(len(frames), len(frames) + len(dtype_frames)),
)
frames.extend(dtype_frames)
header["column-dtypes"] = column_dtypes

return header, frames

@staticmethod
def deserialize(header, frames):
cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

gpu_data = Buffer.deserialize(header["data"], frames)

dbuf = DeviceBuffer(
ptr=gpu_data.get_ptr(mode="write"),
size=gpu_data.nbytes
)

cdef cpp_contiguous_split.packed_columns data
data.metadata = move(
make_unique[vector[uint8_t]](
move(<vector[uint8_t]>header.get("metadata", []))
)
)
data.gpu_data = move(dbuf.c_obj)

p.c_obj = move(data)
p.column_names = header["column-names"]
p.index_names = header["index-names"]

column_dtypes = {}
for name, dtype in header["column-dtypes"].items():
dtype_header, (start, stop) = dtype
column_dtypes[name] = pickle.loads(
dtype_header["type-serialized"]
).deserialize(dtype_header, frames[start:stop])
p.column_dtypes = column_dtypes

return p

def unpack(self):
output_table = cudf.DataFrame._from_data(*data_from_table_view(
cpp_contiguous_split.unpack(self.c_obj),
self,
self.column_names,
self.index_names
))

for name, dtype in self.column_dtypes.items():
output_table._data[name] = (
output_table._data[name]._with_type_metadata(dtype)
)

return output_table


class PackedColumns(Serializable):
"""
A packed representation of a Frame, with all columns residing
in a single GPU memory buffer.
"""

def __init__(self, data):
def __init__(self, data, column_names=None, index_names=None, column_dtypes=None):
self._data = data
self.column_names=column_names
self.index_names=index_names
self.column_dtypes=column_dtypes

def __reduce__(self):
return self.deserialize, self.serialize()
Expand All @@ -488,21 +357,84 @@ class PackedColumns(Serializable):
}

def serialize(self):
header, frames = self._data.serialize()
gpu_data = as_buffer(
data=self._data.gpu_data_ptr,
size=self._data.gpu_data_size,
owner=self,
exposed=True
)
header, frames = self._data.serialize(
gpu_data,
self.column_names,
self.index_names,
self.column_dtypes,
)
self.column_dtypes = header["column-dtypes"]
header["type-serialized"] = pickle.dumps(type(self))

return header, frames

@classmethod
def deserialize(cls, header, frames):
return cls(_CPackedColumns.deserialize(header, frames))
gpu_data = Buffer.deserialize(header["data"], frames)
p, col_names, index_names, col_dtypes = PlcPackedColumns.deserialize(
gpu_data, header, frames
)
return cls(
p,
col_names,
index_names,
col_dtypes,
)

@classmethod
def from_py_table(cls, input_table, keep_index=True):
return cls(_CPackedColumns.from_py_table(input_table, keep_index))
if keep_index and (
not isinstance(input_table.index, cudf.RangeIndex)
or input_table.index.start != 0
or input_table.index.stop != len(input_table)
or input_table.index.step != 1
):
columns = input_table._index._columns + input_table._columns
index_names = input_table._index_names
else:
columns = input_table._columns
index_names = None

column_names = input_table._column_names
column_dtypes = {}
for name, col in input_table._column_labels_and_values:
if isinstance(
col.dtype,
(cudf.core.dtypes._BaseDtype, cudf.core.dtypes.CategoricalDtype)
):
column_dtypes[name] = col.dtype

return cls(
plc.contiguous_split.PackedColumns.from_plc_table(
plc.Table(
[
col.to_pylibcudf(mode="read") for col in columns
]
)
),
column_names=column_names,
index_names=index_names,
column_dtypes=column_dtypes,
)

def unpack(self):
return self._data.unpack()
output_table = cudf.DataFrame._from_data(*data_from_pylibcudf_table(
plc.contiguous_split.unpack(self._data),
self.column_names,
self.index_names
))
print("DTYPES", self.column_dtypes)
for name, dtype in self.column_dtypes.items():
output_table._data[name] = (
output_table._data[name]._with_type_metadata(dtype)
)

return output_table


def pack(input_table, keep_index=True):
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/tests/test_pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def assert_packed_frame_equality(df):
packed = pack(df)
del df
unpacked = unpack(packed)

assert_eq(unpacked, pdf)


Expand Down
Loading

0 comments on commit 635595a

Please sign in to comment.