Skip to content

Commit

Permalink
GH-42198: [C++] Fix GetRecordBatchPayload crashes for device data (#4…
Browse files Browse the repository at this point in the history
…2199)

<!--
Thanks for opening a pull request!
If this is your first pull request you can find detailed information on
how
to contribute here:
* [New Contributor's
Guide](https://arrow.apache.org/docs/dev/developers/guide/step_by_step/pr_lifecycle.html#reviews-and-merge-of-the-pull-request)
* [Contributing
Overview](https://arrow.apache.org/docs/dev/developers/overview.html)


If this is not a [minor
PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes).
Could you open an issue for this pull request on GitHub?
https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the
[Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.)
of the Apache Arrow project.

Then could you also rename the pull request title in the following
format?

    GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

    MINOR: [${COMPONENT}] ${SUMMARY}

In the case of PARQUET issues on JIRA the title also supports:

    PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

-->

### Rationale for this change
Ensuring that creating IPC payloads works correctly for non-CPU data by
utilizing `CopyBufferSliceToCPU`.

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

### What changes are included in this PR?
Adding calls to `CopyBufferSliceToCPU` to the Ipc Writer for base binary
types and for list types, to avoid calls to `value_offset` in those
cases.

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

### Are these changes tested?
Yes. Tests are added to cuda_test.cc

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

### Are there any user-facing changes?
No.

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please uncomment the
line below and explain which changes are breaking.
-->
<!-- **This PR includes breaking changes to public APIs.** -->

<!--
Please uncomment the line below (and provide explanation) if the changes
fix either (a) a security vulnerability, (b) a bug that caused incorrect
or invalid data to be produced, or (c) a bug that causes a crash (even
when the API contract is upheld). We use this to highlight fixes to
issues that may affect users without their knowledge. For this reason,
fixing bugs that cause errors don't count, since those are usually
obvious.
-->
<!-- **This PR contains a "Critical Fix".** -->
* GitHub Issue: #42198
  • Loading branch information
zeroshade authored and raulcd committed Jul 3, 2024
1 parent 459a486 commit b6bc183
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
43 changes: 43 additions & 0 deletions cpp/src/arrow/gpu/cuda_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,49 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) {
CompareBatch(*batch, *cpu_batch);
}

TEST_F(TestCudaArrowIpc, WriteIpcString) {
auto values = ArrayFromJSON(utf8(), R"(["foo", null, "quux"])");
ASSERT_OK_AND_ASSIGN(auto values_device, values->CopyTo(mm_));
auto batch = RecordBatch::Make(schema({field("vals", utf8())}), 3,
{values_device->data()}, DeviceAllocationType::kCUDA);

ipc::IpcPayload payload;
ASSERT_OK(
ipc::GetRecordBatchPayload(*batch, ipc::IpcWriteOptions::Defaults(), &payload));

ASSERT_EQ(values_device->data()->buffers[0]->address(),
payload.body_buffers[0]->address());
ASSERT_EQ(values_device->data()->buffers[1]->address(),
payload.body_buffers[1]->address());
}

TEST_F(TestCudaArrowIpc, WriteIpcList) {
auto values =
ArrayFromJSON(list(utf8()), R"([["foo", null], null, ["quux", "bar", "baz"]])");
ASSERT_OK_AND_ASSIGN(auto values_device, values->CopyTo(mm_));
auto batch = RecordBatch::Make(schema({field("vals", list(utf8()))}), 3,
{values_device->data()}, DeviceAllocationType::kCUDA);

ipc::IpcPayload payload;
ASSERT_OK(
ipc::GetRecordBatchPayload(*batch, ipc::IpcWriteOptions::Defaults(), &payload));

ASSERT_EQ(values_device->data()->buffers[0]->address(),
payload.body_buffers[0]->address());
}

TEST_F(TestCudaArrowIpc, WriteIpcSlicedRecord) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(ipc::test::MakeListRecordBatch(&batch));

ASSERT_OK_AND_ASSIGN(auto batch_device, batch->CopyTo(mm_));
auto sliced_batch_device = batch_device->Slice(10);

ipc::IpcPayload payload;
ASSERT_NOT_OK(ipc::GetRecordBatchPayload(*sliced_batch_device,
ipc::IpcWriteOptions::Defaults(), &payload));
}

TEST_F(TestCudaArrowIpc, DictionaryWriteRead) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(ipc::test::MakeDictionary(&batch));
Expand Down
28 changes: 24 additions & 4 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ class RecordBatchSerializer {
return Status::CapacityError("Cannot write arrays larger than 2^31 - 1 in length");
}

if (arr.offset() != 0 && arr.device_type() != DeviceAllocationType::kCPU) {
// https://github.com/apache/arrow/issues/43029
return Status::NotImplemented("Cannot compute null count for non-cpu sliced array");
}

// push back all common elements
field_nodes_.push_back({arr.length(), arr.null_count(), 0});

Expand Down Expand Up @@ -449,14 +454,22 @@ class RecordBatchSerializer {

template <typename T>
enable_if_base_binary<typename T::TypeClass, Status> Visit(const T& array) {
using offset_type = typename T::offset_type;

std::shared_ptr<Buffer> value_offsets;
RETURN_NOT_OK(GetZeroBasedValueOffsets<T>(array, &value_offsets));
auto data = array.value_data();

int64_t total_data_bytes = 0;
if (value_offsets) {
total_data_bytes = array.value_offset(array.length()) - array.value_offset(0);
if (value_offsets && array.length() > 0) {
offset_type last_offset_value;
RETURN_NOT_OK(MemoryManager::CopyBufferSliceToCPU(
value_offsets, array.length() * sizeof(offset_type), sizeof(offset_type),
reinterpret_cast<uint8_t*>(&last_offset_value)));

total_data_bytes = last_offset_value;
}

if (NeedTruncate(array.offset(), data.get(), total_data_bytes)) {
// Slice the data buffer to include only the range we need now
const int64_t start_offset = array.value_offset(0);
Expand Down Expand Up @@ -495,8 +508,15 @@ class RecordBatchSerializer {
offset_type values_offset = 0;
offset_type values_length = 0;
if (value_offsets) {
values_offset = array.value_offset(0);
values_length = array.value_offset(array.length()) - values_offset;
RETURN_NOT_OK(MemoryManager::CopyBufferSliceToCPU(
array.value_offsets(), array.offset() * sizeof(offset_type),
sizeof(offset_type), reinterpret_cast<uint8_t*>(&values_offset)));
offset_type last_values_offset = 0;
RETURN_NOT_OK(MemoryManager::CopyBufferSliceToCPU(
array.value_offsets(), (array.offset() + array.length()) * sizeof(offset_type),
sizeof(offset_type), reinterpret_cast<uint8_t*>(&last_values_offset)));

values_length = last_values_offset - values_offset;
}

if (array.offset() != 0 || values_length < values->length()) {
Expand Down

0 comments on commit b6bc183

Please sign in to comment.