Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43694: [C++] Add an ExecContext Option to arrow::dataset::ScanOptions #43698

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
Expand Down Expand Up @@ -633,10 +634,15 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
kParquetTypeName, options.get(), default_fragment_scan_options));
int batch_readahead = options->batch_readahead;
int64_t rows_to_readahead = batch_readahead * options->batch_size;
ARROW_ASSIGN_OR_RAISE(auto generator,
reader->GetRecordBatchGenerator(
reader, row_groups, column_projection,
::arrow::internal::GetCpuThreadPool(), rows_to_readahead));
// Modified this to pass the executor in scan_options instead of always using the
// default CPU thread pool.
// XXX Should we get it from options->fragment_scan_options instead??
auto cpu_executor = options->exec_context.executor()
? options->exec_context.executor()
: ::arrow::internal::GetCpuThreadPool();
ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
reader, row_groups, column_projection,
cpu_executor, rows_to_readahead));
RecordBatchGenerator sliced =
SlicingGenerator(std::move(generator), options->batch_size);
if (batch_readahead == 0) {
Expand Down
49 changes: 49 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>
#include <vector>

#include "arrow/acero/exec_plan.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/parquet_encryption_config.h"
Expand Down Expand Up @@ -910,5 +911,53 @@ TEST_F(TestParquetFileFormat, MultithreadedScanRegression) {
}
}

TEST_F(TestParquetFileFormat, MultithreadedComputeRegression) {
// GH-43694: Test similar situation as MultithreadedScanRegression but with
// the exec context instead

auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);
ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));

std::vector<Future<>> completes;
std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;

for (int idx = 0; idx < 2; ++idx) {
auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
auto fragment = MakeFragment(*source);
std::shared_ptr<Scanner> scanner;

{
auto options = std::make_shared<ScanOptions>();
ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
pools.emplace_back(thread_pool);
options->exec_context =
::arrow::ExecContext(::arrow::default_memory_pool(), pools.back().get());
Comment on lines +934 to +935
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ensure this is being called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to ensure that options->exec_context is set because initially the ScanOptions is constructed with the default ExecContext constructor that uses the default MemoryPool and nullptr for the executor. Thus, we need to check everywhere if the exec_context->executor is null and use the default CPU pool if true.

auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);

options->fragment_scan_options = fragment_scan_options;
ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

ASSERT_OK(builder.UseThreads(true));
ASSERT_OK(builder.BatchSize(10000));
ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
}

ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
[[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
// Random ReadAsync calls, generate some futures to make the state machine
// more complex.
for (int yy = 0; yy < 16; yy++) {
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
}
scanner = nullptr;
}

for (auto& f : completes) {
f.Wait();
}
}

} // namespace dataset
} // namespace arrow
16 changes: 9 additions & 7 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,10 @@ class OneShotFragment : public Fragment {
ARROW_ASSIGN_OR_RAISE(
auto background_gen,
MakeBackgroundGenerator(std::move(batch_it_), options->io_context.executor()));
return MakeTransferredGenerator(std::move(background_gen),
::arrow::internal::GetCpuThreadPool());
auto cpu_executor = options->exec_context.executor()
? options->exec_context.executor()
: ::arrow::internal::GetCpuThreadPool();
return MakeTransferredGenerator(std::move(background_gen), cpu_executor);
}
std::string type_name() const override { return "one-shot"; }

Expand All @@ -382,15 +384,15 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
[this](::arrow::internal::Executor* executor) {
return ScanBatchesAsync(executor);
},
scan_options_->use_threads);
scan_options_->use_threads, this->async_cpu_executor());
}

Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
return ::arrow::internal::IterateSynchronously<EnumeratedRecordBatch>(
[this](::arrow::internal::Executor* executor) {
return ScanBatchesUnorderedAsync(executor);
},
scan_options_->use_threads);
scan_options_->use_threads, this->async_cpu_executor());
}

Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
Expand All @@ -400,7 +402,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
}

Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
return ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool(),
return ScanBatchesUnorderedAsync(this->async_cpu_executor(),
/*sequence_fragments=*/false);
}

Expand Down Expand Up @@ -601,7 +603,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
}

Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool());
return ScanBatchesAsync(this->async_cpu_executor());
}

Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
Expand Down Expand Up @@ -778,7 +780,7 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
}

Future<int64_t> AsyncScanner::CountRowsAsync() {
return CountRowsAsync(::arrow::internal::GetCpuThreadPool());
return CountRowsAsync(this->async_cpu_executor());
}

Result<int64_t> AsyncScanner::CountRows() {
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Note: The IOContext executor will be ignored if use_threads is set to false
io::IOContext io_context;

/// ExecContext for any CPU tasks
///
/// Note: The ExecContext executor will be ignored if use_threads is set to false
compute::ExecContext exec_context;

/// If true the scanner will scan in parallel
///
/// Note: If true, this will use threads from both the cpu_executor and the
Expand Down Expand Up @@ -442,6 +447,11 @@ class ARROW_DS_EXPORT Scanner {
TaggedRecordBatchIterator scan);

const std::shared_ptr<ScanOptions> scan_options_;

::arrow::internal::Executor* async_cpu_executor() const {
return scan_options_->exec_context.executor() ? scan_options_->exec_context.executor()
: ::arrow::internal::GetCpuThreadPool();
}
};

/// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
Expand Down
25 changes: 16 additions & 9 deletions cpp/src/arrow/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,21 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
}
}

template <typename T>
Iterator<T> IterateSynchronously(
FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads,
Executor* executor) {
if (use_threads) {
auto maybe_gen = std::move(get_gen)(executor);
if (!maybe_gen.ok()) {
return MakeErrorIterator<T>(maybe_gen.status());
}
return MakeGeneratorIterator(*maybe_gen);
} else {
return SerialExecutor::IterateGenerator(std::move(get_gen));
}
}

/// \brief Potentially iterate an async generator serially (if use_threads is false)
/// \see IterateGenerator
///
Expand All @@ -605,15 +620,7 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
template <typename T>
Iterator<T> IterateSynchronously(
FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads) {
if (use_threads) {
auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
if (!maybe_gen.ok()) {
return MakeErrorIterator<T>(maybe_gen.status());
}
return MakeGeneratorIterator(*maybe_gen);
} else {
return SerialExecutor::IterateGenerator(std::move(get_gen));
}
return IterateSynchronously(std::move(get_gen), use_threads, GetCpuThreadPool());
}

} // namespace internal
Expand Down
Loading