From 89d6354068c11a66fcec2f34d0414daca327e2e0 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 19 Jun 2024 11:46:26 +0200 Subject: [PATCH] GH-40384: [Python] Expand the C Device Interface bindings to support import on CUDA device (#40385) ### Rationale for this change Follow-up on https://github.com/apache/arrow/issues/39979 which added `_export_to_c_device`/`_import_from_c_device` methods, but for now only for CPU devices. ### What changes are included in this PR? * Ensure `pyarrow.cuda` is imported before importing data through the C Interface, to ensure the CUDA device is registered * Add tests for exporting/importing with the device interface on CUDA ### Are these changes tested? Yes, added tests for CUDA. * GitHub Issue: #40384 Authored-by: Joris Van den Bossche Signed-off-by: Joris Van den Bossche --- python/pyarrow/array.pxi | 10 +- python/pyarrow/includes/libarrow.pxd | 7 +- python/pyarrow/lib.pyx | 19 ++++ python/pyarrow/table.pxi | 9 +- python/pyarrow/tests/test_cffi.py | 21 ++++ python/pyarrow/tests/test_cuda.py | 152 +++++++++++++++++++++++++++ 6 files changed, 210 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index efa9b814edf61..64a6ceaa6eaa4 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1825,23 +1825,25 @@ cdef class Array(_PandasConvertible): This is a low-level function intended for expert users. """ cdef: - void* c_ptr = _as_c_pointer(in_ptr) + ArrowDeviceArray* c_device_array = _as_c_pointer(in_ptr) void* c_type_ptr shared_ptr[CArray] c_array + if c_device_array.device_type == ARROW_DEVICE_CUDA: + _ensure_cuda_loaded() + c_type = pyarrow_unwrap_data_type(type) if c_type == nullptr: # Not a DataType object, perhaps a raw ArrowSchema pointer c_type_ptr = _as_c_pointer(type) with nogil: c_array = GetResultValue( - ImportDeviceArray( c_ptr, - c_type_ptr) + ImportDeviceArray(c_device_array, c_type_ptr) ) else: with nogil: c_array = GetResultValue( - ImportDeviceArray( c_ptr, c_type) + ImportDeviceArray(c_device_array, c_type) ) return pyarrow_wrap_array(c_array) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 0d63ec6be38d8..53ad95f2430be 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2964,8 +2964,13 @@ cdef extern from "arrow/c/abi.h": cdef struct ArrowArrayStream: void (*release)(ArrowArrayStream*) noexcept nogil + ctypedef int32_t ArrowDeviceType + cdef ArrowDeviceType ARROW_DEVICE_CUDA + cdef struct ArrowDeviceArray: - pass + ArrowArray array + int64_t device_id + int32_t device_type cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportType(CDataType&, ArrowSchema* out) diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 904e018ffddcc..e08021c62b5ae 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -125,6 +125,7 @@ UnionMode_DENSE = _UnionMode_DENSE __pc = None __pac = None +__cuda_loaded = None def _pc(): @@ -143,6 +144,24 @@ def _pac(): return __pac +def _ensure_cuda_loaded(): + # Try importing the cuda module to ensure libarrow_cuda gets loaded + # to register the CUDA device for the C Data Interface import + global __cuda_loaded + if __cuda_loaded is None: + try: + import pyarrow.cuda # no-cython-lint + __cuda_loaded = True + except ImportError as exc: + __cuda_loaded = str(exc) + + if __cuda_loaded is not True: + raise ImportError( + "Trying to import data on a CUDA device, but PyArrow is not built with " + f"CUDA support.\n(importing 'pyarrow.cuda' resulted in \"{__cuda_loaded}\")." + ) + + def _gdb_test_session(): GdbTestSession() diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 379bb82ea6ede..767e09004592c 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3752,21 +3752,24 @@ cdef class RecordBatch(_Tabular): This is a low-level function intended for expert users. """ cdef: - void* c_ptr = _as_c_pointer(in_ptr) + ArrowDeviceArray* c_device_array = _as_c_pointer(in_ptr) void* c_schema_ptr shared_ptr[CRecordBatch] c_batch + if c_device_array.device_type == ARROW_DEVICE_CUDA: + _ensure_cuda_loaded() + c_schema = pyarrow_unwrap_schema(schema) if c_schema == nullptr: # Not a Schema object, perhaps a raw ArrowSchema pointer c_schema_ptr = _as_c_pointer(schema, allow_null=True) with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema_ptr)) + c_device_array, c_schema_ptr)) else: with nogil: c_batch = GetResultValue(ImportDeviceRecordBatch( - c_ptr, c_schema)) + c_device_array, c_schema)) return pyarrow_wrap_batch(c_batch) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 45a3db9b66fc5..369ed9142824d 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -705,3 +705,24 @@ def test_roundtrip_chunked_array_capsule_requested_schema(): ValueError, match="Could not cast string to requested type int64" ): chunked.__arrow_c_stream__(requested_capsule) + + +def test_import_device_no_cuda(): + try: + import pyarrow.cuda # noqa + except ImportError: + pass + else: + pytest.skip("pyarrow.cuda is available") + + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + arr = pa.array([1, 2, 3], type=pa.int64()) + arr._export_to_c_device(ptr_array) + + # patch the device type of the struct, this results in an invalid ArrowDeviceArray + # but this is just to test we raise am error before actually importing buffers + c_array.device_type = 2 # ARROW_DEVICE_CUDA + + with pytest.raises(ImportError, match="Trying to import data on a CUDA device"): + pa.Array._import_from_c_device(ptr_array, arr.type) diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py index 43cd16a3cf666..400db2643bd56 100644 --- a/python/pyarrow/tests/test_cuda.py +++ b/python/pyarrow/tests/test_cuda.py @@ -792,3 +792,155 @@ def test_IPC(size): p.start() p.join() assert p.exitcode == 0 + + +def _arr_copy_to_host(carr): + # TODO replace below with copy to device when exposed in python + buffers = [] + for cbuf in carr.buffers(): + if cbuf is None: + buffers.append(None) + else: + buf = global_context.foreign_buffer( + cbuf.address, cbuf.size, cbuf + ).copy_to_host() + buffers.append(buf) + + child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:]) + new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child]) + return new + + +def test_device_interface_array(): + cffi = pytest.importorskip("pyarrow.cffi") + ffi = cffi.ffi + + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + + typ = pa.list_(pa.int32()) + arr = pa.array([[1], [2, 42]], type=typ) + + # TODO replace below with copy to device when exposed in python + cbuffers = [] + for buf in arr.buffers(): + if buf is None: + cbuffers.append(None) + else: + cbuf = global_context.new_buffer(buf.size) + cbuf.copy_from_host(buf, position=0, nbytes=buf.size) + cbuffers.append(cbuf) + + carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ + pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) + ]) + + # Type is known up front + carr._export_to_c_device(ptr_array) + + # verify exported struct + assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2 + assert c_array.device_id == global_context.device_number + assert c_array.array.length == 2 + + # Delete recreate C++ object from exported pointer + del carr + carr_new = pa.Array._import_from_c_device(ptr_array, typ) + assert carr_new.type == pa.list_(pa.int32()) + arr_new = _arr_copy_to_host(carr_new) + assert arr_new.equals(arr) + + del carr_new + # Now released + with pytest.raises(ValueError, match="Cannot import released ArrowArray"): + pa.Array._import_from_c_device(ptr_array, typ) + + # Schema is exported and imported at the same time + carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ + pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) + ]) + carr._export_to_c_device(ptr_array, ptr_schema) + # Delete and recreate C++ objects from exported pointers + del carr + carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema) + assert carr_new.type == pa.list_(pa.int32()) + arr_new = _arr_copy_to_host(carr_new) + assert arr_new.equals(arr) + + del carr_new + # Now released + with pytest.raises(ValueError, match="Cannot import released ArrowSchema"): + pa.Array._import_from_c_device(ptr_array, ptr_schema) + + +def _batch_copy_to_host(cbatch): + # TODO replace below with copy to device when exposed in python + arrs = [] + for col in cbatch.columns: + buffers = [ + global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host() + if buf is not None else None + for buf in col.buffers() + ] + new = pa.Array.from_buffers(col.type, len(col), buffers) + arrs.append(new) + + return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema) + + +def test_device_interface_batch_array(): + cffi = pytest.importorskip("pyarrow.cffi") + ffi = cffi.ffi + + c_schema = ffi.new("struct ArrowSchema*") + ptr_schema = int(ffi.cast("uintptr_t", c_schema)) + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + + batch = make_recordbatch(10) + schema = batch.schema + cbuf = cuda.serialize_record_batch(batch, global_context) + cbatch = cuda.read_record_batch(cbuf, schema) + + # Schema is known up front + cbatch._export_to_c_device(ptr_array) + + # verify exported struct + assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2 + assert c_array.device_id == global_context.device_number + assert c_array.array.length == 10 + + # Delete recreate C++ object from exported pointer + del cbatch + cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) + assert cbatch_new.schema == schema + batch_new = _batch_copy_to_host(cbatch_new) + assert batch_new.equals(batch) + + del cbatch_new + # Now released + with pytest.raises(ValueError, match="Cannot import released ArrowArray"): + pa.RecordBatch._import_from_c_device(ptr_array, schema) + + # Schema is exported and imported at the same time + cbatch = cuda.read_record_batch(cbuf, schema) + cbatch._export_to_c_device(ptr_array, ptr_schema) + # Delete and recreate C++ objects from exported pointers + del cbatch + cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) + assert cbatch_new.schema == schema + batch_new = _batch_copy_to_host(cbatch_new) + assert batch_new.equals(batch) + + del cbatch_new + # Now released + with pytest.raises(ValueError, match="Cannot import released ArrowSchema"): + pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) + + # Not a struct type + pa.int32()._export_to_c(ptr_schema) + with pytest.raises(ValueError, + match="ArrowSchema describes non-struct type"): + pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)