Skip to content

Commit

Permalink
Make TestFileSystemDataset test produce out of order data
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Oct 18, 2024
1 parent df98801 commit 36243c3
Showing 1 changed file with 43 additions and 6 deletions.
49 changes: 43 additions & 6 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
#include <cstdint>
#include <memory>
#include <string>
#include <thread>
#include <tuple>
#include <vector>
#include <arrow/compute/function.h>
#include <arrow/compute/registry.h>

#include <gmock/gmock.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -351,6 +354,31 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}

Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch, compute::ExecResult* out) {
// Get the input array
const ArraySpan& input = batch[0].array;

// Prepare the output buffer
auto output = out->array_span();

// Get the input data as uint32_t array
const uint32_t* input_values = input.GetValues<uint32_t>(1);

// Get the mutable output buffer for writing
uint8_t* output_values = output->buffers[1].data;

// Boolean data is stored in 1 bit per value, so we need to set bits
for (int64_t i = 0; i < input.length; ++i) {
printf("%d\n", input_values[i]);
if (input_values[i] % 16 == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
bit_util::SetBitTo(output_values, i, true); // Set the bit at index i
}

return Status::OK();
}

TEST_F(TestFileSystemDataset, WritePersistOrder) {
// Test for ARROW-26818
auto format = std::make_shared<IpcFileFormat>();
Expand All @@ -363,11 +391,20 @@ TEST_F(TestFileSystemDataset, WritePersistOrder) {
auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
auto dataset = std::make_shared<InMemoryDataset>(table);

ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ARROW_CHECK_OK(scanner_builder->UseThreads(true));
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
auto delay_func = std::make_shared<compute::ScalarFunction>("delay", compute::Arity(1), compute::FunctionDoc());
compute::ScalarKernel delay_kernel;
delay_kernel.exec = delay;
delay_kernel.signature = compute::KernelSignature::Make({uint32()}, boolean());
ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));

for (bool preserve_order : {true, false}) {
printf("test %d\n", preserve_order);
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ARROW_CHECK_OK(scanner_builder->UseThreads(true));
ARROW_CHECK_OK(scanner_builder->Filter(compute::call("delay", {compute::field_ref("f0")})));
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
write_options.filesystem = fs;
write_options.preserve_order = preserve_order;
Expand All @@ -386,15 +423,15 @@ TEST_F(TestFileSystemDataset, WritePersistOrder) {
std::shared_ptr<RecordBatch> batch;
ABORT_NOT_OK(reader.ReadNext(&batch));
int32_t prev = -1;
auto out_of_order = false;
int out_of_order = 0;
while (batch != nullptr) {
for (int row = 0; row < batch->num_rows(); ++row) {
auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
auto numeric_scalar =
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
int32_t value = numeric_scalar->value;
if (value <= prev) {
out_of_order = true;
out_of_order++;
}
prev = value;
}
Expand All @@ -403,7 +440,7 @@ TEST_F(TestFileSystemDataset, WritePersistOrder) {
// TODO: this currently fails because out-of-order batches cannot be reproduced with
// this test how can we guarantee that table written with preserve_order = false is
// out of order? Other than SimpleWriteNodeTest.SequenceOutput in write_node_test.cc
ASSERT_EQ(out_of_order, !preserve_order);
ASSERT_EQ(out_of_order > 0, !preserve_order);
}
}

Expand Down

0 comments on commit 36243c3

Please sign in to comment.