Skip to content

Commit

Permalink
[run ci] add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
srilman committed Aug 14, 2024
1 parent acb8c57 commit b9fe4cd
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>
#include <vector>

#include "arrow/acero/exec_plan.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/parquet_encryption_config.h"
Expand Down Expand Up @@ -910,5 +911,53 @@ TEST_F(TestParquetFileFormat, MultithreadedScanRegression) {
}
}

TEST_F(TestParquetFileFormat, MultithreadedComputeRegression) {
// GH-43694: Test similar situation as MultithreadedScanRegression but with
// the exec context instead

auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);
ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));

std::vector<Future<>> completes;
std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;

for (int idx = 0; idx < 2; ++idx) {
auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
auto fragment = MakeFragment(*source);
std::shared_ptr<Scanner> scanner;

{
auto options = std::make_shared<ScanOptions>();
ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
pools.emplace_back(thread_pool);
options->exec_context =
::arrow::ExecContext(::arrow::default_memory_pool(), pools.back().get());
auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);

options->fragment_scan_options = fragment_scan_options;
ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

ASSERT_OK(builder.UseThreads(true));
ASSERT_OK(builder.BatchSize(10000));
ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
}

ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
[[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
// Random ReadAsync calls, generate some futures to make the state machine
// more complex.
for (int yy = 0; yy < 16; yy++) {
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
}
scanner = nullptr;
}

for (auto& f : completes) {
f.Wait();
}
}

} // namespace dataset
} // namespace arrow

0 comments on commit b9fe4cd

Please sign in to comment.