diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc index 4c450bf389919..4c9b961fa1465 100644 --- a/cpp/src/arrow/gpu/cuda_test.cc +++ b/cpp/src/arrow/gpu/cuda_test.cc @@ -679,6 +679,49 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) { CompareBatch(*batch, *cpu_batch); } +TEST_F(TestCudaArrowIpc, WriteIpcString) { + auto values = ArrayFromJSON(utf8(), R"(["foo", null, "quux"])"); + ASSERT_OK_AND_ASSIGN(auto values_device, values->CopyTo(mm_)); + auto batch = RecordBatch::Make(schema({field("vals", utf8())}), 3, + {values_device->data()}, DeviceAllocationType::kCUDA); + + ipc::IpcPayload payload; + ASSERT_OK( + ipc::GetRecordBatchPayload(*batch, ipc::IpcWriteOptions::Defaults(), &payload)); + + ASSERT_EQ(values_device->data()->buffers[0]->address(), + payload.body_buffers[0]->address()); + ASSERT_EQ(values_device->data()->buffers[1]->address(), + payload.body_buffers[1]->address()); +} + +TEST_F(TestCudaArrowIpc, WriteIpcList) { + auto values = + ArrayFromJSON(list(utf8()), R"([["foo", null], null, ["quux", "bar", "baz"]])"); + ASSERT_OK_AND_ASSIGN(auto values_device, values->CopyTo(mm_)); + auto batch = RecordBatch::Make(schema({field("vals", list(utf8()))}), 3, + {values_device->data()}, DeviceAllocationType::kCUDA); + + ipc::IpcPayload payload; + ASSERT_OK( + ipc::GetRecordBatchPayload(*batch, ipc::IpcWriteOptions::Defaults(), &payload)); + + ASSERT_EQ(values_device->data()->buffers[0]->address(), + payload.body_buffers[0]->address()); +} + +TEST_F(TestCudaArrowIpc, WriteIpcSlicedRecord) { + std::shared_ptr batch; + ASSERT_OK(ipc::test::MakeListRecordBatch(&batch)); + + ASSERT_OK_AND_ASSIGN(auto batch_device, batch->CopyTo(mm_)); + auto sliced_batch_device = batch_device->Slice(10); + + ipc::IpcPayload payload; + ASSERT_NOT_OK(ipc::GetRecordBatchPayload(*sliced_batch_device, + ipc::IpcWriteOptions::Defaults(), &payload)); +} + TEST_F(TestCudaArrowIpc, DictionaryWriteRead) { std::shared_ptr batch; ASSERT_OK(ipc::test::MakeDictionary(&batch)); diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 549fb34a2e880..f603e60c66555 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -154,6 +154,11 @@ class RecordBatchSerializer { return Status::CapacityError("Cannot write arrays larger than 2^31 - 1 in length"); } + if (arr.offset() != 0 && arr.device_type() != DeviceAllocationType::kCPU) { + // https://github.com/apache/arrow/issues/43029 + return Status::NotImplemented("Cannot compute null count for non-cpu sliced array"); + } + // push back all common elements field_nodes_.push_back({arr.length(), arr.null_count(), 0}); @@ -449,14 +454,22 @@ class RecordBatchSerializer { template enable_if_base_binary Visit(const T& array) { + using offset_type = typename T::offset_type; + std::shared_ptr value_offsets; RETURN_NOT_OK(GetZeroBasedValueOffsets(array, &value_offsets)); auto data = array.value_data(); int64_t total_data_bytes = 0; - if (value_offsets) { - total_data_bytes = array.value_offset(array.length()) - array.value_offset(0); + if (value_offsets && array.length() > 0) { + offset_type last_offset_value; + RETURN_NOT_OK(MemoryManager::CopyBufferSliceToCPU( + value_offsets, array.length() * sizeof(offset_type), sizeof(offset_type), + reinterpret_cast(&last_offset_value))); + + total_data_bytes = last_offset_value; } + if (NeedTruncate(array.offset(), data.get(), total_data_bytes)) { // Slice the data buffer to include only the range we need now const int64_t start_offset = array.value_offset(0); @@ -495,8 +508,15 @@ class RecordBatchSerializer { offset_type values_offset = 0; offset_type values_length = 0; if (value_offsets) { - values_offset = array.value_offset(0); - values_length = array.value_offset(array.length()) - values_offset; + RETURN_NOT_OK(MemoryManager::CopyBufferSliceToCPU( + array.value_offsets(), array.offset() * sizeof(offset_type), + sizeof(offset_type), reinterpret_cast(&values_offset))); + offset_type last_values_offset = 0; + RETURN_NOT_OK(MemoryManager::CopyBufferSliceToCPU( + array.value_offsets(), (array.offset() + array.length()) * sizeof(offset_type), + sizeof(offset_type), reinterpret_cast(&last_values_offset))); + + values_length = last_values_offset - values_offset; } if (array.offset() != 0 || values_length < values->length()) {