Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38438: [C++] Dataset: Trying to fix the async bug in Parquet dataset #38466

Merged
merged 5 commits into from
Nov 17, 2023

Conversation

mapleFU
Copy link
Member

@mapleFU mapleFU commented Oct 25, 2023

Rationale for this change

Origin mentioned #38438

  1. When PreBuffer is default enabled, the code in RowGroupGenerator::FetchNext would switch to async mode. This make the state handling more complex
  2. In RowGroupGenerator::FetchNext, [this] is captured without shared_from_this. This is not bad, however, this->executor_ may point to a invalid address if this dtor.

This patch also fixes a lifetime issue I founded in CSV handling.

What changes are included in this PR?

  1. Fix handling in cpp/src/parquet/arrow/reader.cc as I talked above
  2. Fix a lifetime problem in CSV

Are these changes tested?

I test it locality. But don't know how to write unittest here. Fell free to help.

Are there any user-facing changes?

Bugfix

@mapleFU mapleFU requested a review from wgtmac as a code owner October 25, 2023 19:28
@mapleFU mapleFU requested review from pitrou and bkietz and removed request for wgtmac October 25, 2023 19:28
@github-actions
Copy link

⚠️ GitHub issue #38438 has been automatically assigned in GitHub to PR creator.

@mapleFU
Copy link
Member Author

mapleFU commented Oct 25, 2023

@pitrou @bkietz I've tried to fix #38438 here

Also cc @austin3dickey . I've reproduce this bug in my local-machine with ASAN, and fix it with this changing. I'm not so familiar with building Python, you can try to build an verify it here.

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give this a more thorough review tomorrow, for now:

cpp/src/arrow/csv/reader.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/reader.cc Outdated Show resolved Hide resolved
@github-actions github-actions bot added awaiting changes Awaiting changes awaiting change review Awaiting change review and removed awaiting review Awaiting review awaiting changes Awaiting changes labels Oct 26, 2023
@mapleFU mapleFU force-pushed the dataset/fixing-async-bug branch from ad5d79b to 843c575 Compare October 26, 2023 02:41
@mapleFU mapleFU requested a review from bkietz October 26, 2023 05:31
@austin3dickey
Copy link
Contributor

@ursabot please benchmark name=dataset-serialize lang=Python

@ursabot
Copy link

ursabot commented Oct 26, 2023

Benchmark runs are scheduled for commit 843c575. Watch https://buildkite.com/apache-arrow and https://conbench.ursa.dev for updates. A comment will be posted here when the runs are complete.

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add your reproducer #38438 (comment) as a test?

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Oct 26, 2023
@mapleFU
Copy link
Member Author

mapleFU commented Oct 26, 2023

Could you please add your reproducer #38438 (comment) as a test?

My code is far from the point we meet the segment fault. I build a Scanner and call Head to it multiple times.

Let me use Parquet to re-produce it.

@mapleFU
Copy link
Member Author

mapleFU commented Oct 26, 2023

@bkietz For testing. How can I mock a stream which Read returns a Future? BufferReader will read instantly. Should I create a mock-reader?

This test need issue an IO in RowGroup RecordBatch Generator and just return without requiring the value. And when IO complete, the io thread call the callback, which will trigger the segment fault. ( This is triggered because @austin3dickey uses Head, which trigger generating the batch, and some future will not be wait to read)

@bkietz
Copy link
Member

bkietz commented Oct 26, 2023

Should I create a mock-reader?

Yes, this pattern is used elsewhere in testing too. DelayedBufferReader should work here, I think
https://github.com/bkietz/arrow/blob/27981976f7dfd4c4ab7f60677f849439e038fc55/cpp/src/arrow/filesystem/mockfs.cc#L243-L254

And when IO complete, the io thread call the callback, which will trigger the segment fault.

Just adding the regression test to ensure future refactorings don't reintroduce the segfault should be sufficient

@austin3dickey
Copy link
Contributor

austin3dickey commented Oct 26, 2023

The benchmark passed, which is exactly what we're looking for. Thanks!

(The notification hasn't been posted here yet because the baseline commit hasn't finished its own benchmarks yet)

@mapleFU
Copy link
Member Author

mapleFU commented Oct 26, 2023

Just adding the regression test to ensure future refactorings don't reintroduce the segfault should be sufficient

Aha the regression test using a large local file

@mapleFU
Copy link
Member Author

mapleFU commented Oct 26, 2023

Do we have some way to break the stack 😅 I've re-produce the case but asan didn't report error.

{
  Generator();
}

// finish-io task, and call callback to trigger asan error

When finish IO task, ReadRowGroup doesn't report ASAN error here..

@mapleFU
Copy link
Member Author

mapleFU commented Oct 26, 2023

class DelayedBufferReader : public ::arrow::io::BufferReader {
 public:
  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
      : ::arrow::io::BufferReader(buffer) {}

  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(const ::arrow::io::IOContext& io_context, int64_t position,
                                            int64_t nbytes) override {
    read_async_count.fetch_add(1);
    std::this_thread::sleep_for(std::chrono::seconds(10));
    return ::arrow::io::BufferReader::ReadAsync(io_context, position, nbytes);
  }

  std::atomic<int> read_async_count{0};
};

TEST(TestArrowReadWrite, ScanContentsGracefulShutdown) {
  ArrowReaderProperties properties = default_arrow_reader_properties();
  properties.set_batch_size(256);
  properties.set_pre_buffer(true);
  properties.set_use_threads(true);
  auto cache_options = ::arrow::io::CacheOptions::LazyDefaults();
  cache_options.hole_size_limit = 1;
  cache_options.range_size_limit = 1;
  properties.set_cache_options(cache_options);
  const int num_rows = 1024;
  const int row_group_size = 16;
  const int num_columns = 1;

  std::shared_ptr<Table> table;
  ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));

  std::shared_ptr<Buffer> buffer;
  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
                                             default_arrow_writer_properties(), &buffer));

  auto mock_input_stream = std::make_shared<DelayedBufferReader>(buffer);
  std::vector<std::unique_ptr<::arrow::DelayedExecutor>> delayed_executors;
  delayed_executors.resize(3);
  // vector of futures
  std::vector<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>> futures;
  for (int idx = 0; idx < 3; ++idx) {
    delayed_executors[idx] = std::make_unique<::arrow::DelayedExecutor>();
    auto& delayed_executor = *delayed_executors[idx];
    std::shared_ptr<FileReader> reader;
    {
      std::unique_ptr<FileReader> unique_reader;
      FileReaderBuilder builder;
      ASSERT_OK(builder.Open(mock_input_stream));
      ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
      reader = std::move(unique_reader);
    }
    {
      ASSERT_OK_AND_ASSIGN(
          auto batch_generator,
          reader->GetRecordBatchGenerator(reader, {0, 1}, {0}, &delayed_executor, 256));
      auto fut1 = batch_generator();
      auto fut2 = batch_generator();
      futures.push_back(fut1);
      futures.push_back(fut2);
    }
    // clear reader.
    reader = nullptr;
  }
  auto pool = ::arrow::internal::ThreadPool::Make(2).ValueOrDie();
  pool->Submit([&]() {
    for (int idx = 0; idx < 3; ++idx) {
      auto& delayed_executor = *delayed_executors[idx];
      while (!delayed_executor.captured_tasks.empty()) {
        std::cout << "count:" << delayed_executor.captured_tasks.size() << std::endl;
        auto callbacks = std::move(delayed_executor.captured_tasks);
        for (auto& callback : callbacks) {
          std::move(callback)();
        }
      }
    }
  }).ValueOrDie().Wait();
}

I've tried an ugly test like this, but though reader->GetRecordBatchGenerator has dtor, the this->executor_ didn't trigger the ASAN / UBSAN error. Maybe I need to take another ways later.

@conbench-apache-arrow
Copy link

Thanks for your patience. Conbench analyzed the 1 benchmarking run that has been run so far on PR commit 843c575.

There was 1 benchmark result indicating a performance regression:

The full Conbench report has more details.

@mapleFU
Copy link
Member Author

mapleFU commented Oct 27, 2023

Also cannot re-produce by:


class DelayedBufferReader : public ::arrow::io::BufferReader {
 public:
  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
      : ::arrow::io::BufferReader(buffer) {}

  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(const ::arrow::io::IOContext& io_context, int64_t position,
                                                     int64_t nbytes) override {
    read_async_count.fetch_add(1);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return ::arrow::io::BufferReader::ReadAsync(io_context, position, nbytes);
  }

  std::atomic<int> read_async_count{0};
};

TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) {
  auto reader = GetRecordBatchReader(schema({field("utf8", utf8())}));

  ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
  auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
  auto source = std::make_shared<FileSource>(std::move(buffer_reader), buffer->size());

  auto fragment = MakeFragment(*source);

  auto options = std::make_shared<ScanOptions>();
  auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
  fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
  options->fragment_scan_options = fragment_scan_options;
  options->use_threads = true;
  ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

  ASSERT_OK(builder.UseThreads(true));
  ASSERT_OK(builder.BatchSize(128));
  ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish());

  ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(8000));
  ASSERT_OK_AND_ASSIGN(batch, scanner->Head(8000 * 10));
  ASSERT_OK_AND_ASSIGN(batch, scanner->Head(8000 * 10));
}

@mapleFU
Copy link
Member Author

mapleFU commented Oct 27, 2023

@bkietz I've tried but cannot reproduce it, but currently failed to reproduce without the sample file...Need some help here 😭

@mapleFU
Copy link
Member Author

mapleFU commented Nov 14, 2023

Sadly I've trigger another problem when trying to re-produce the bug. 😅

class DelayedBufferReader : public ::arrow::io::BufferReader {
 public:
  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
      : ::arrow::io::BufferReader(buffer) {}

  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(const ::arrow::io::IOContext& io_context, int64_t position,
                                                     int64_t nbytes) override {
    read_async_count.fetch_add(1);
    auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
    return DeferNotOk(::arrow::io::internal::SubmitIO(
        io_context, [self, position, nbytes, io_context] {
          std::this_thread::sleep_for(std::chrono::seconds(1));
          return self->::arrow::io::BufferReader::ReadAsync(io_context, position, nbytes);
        }));
  }

  std::atomic<int> read_async_count{0};
};

TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) {
  auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);

  ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));

  for (int idx = 0; idx < 3; ++idx) {
    auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
    auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
    auto fragment = MakeFragment(*source);
    std::shared_ptr<Scanner> scanner;

    {
      auto options = std::make_shared<ScanOptions>();
      auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
      fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
      options->fragment_scan_options = fragment_scan_options;
      options->use_threads = true;
      ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

      ASSERT_OK(builder.UseThreads(true));
      ASSERT_OK(builder.BatchSize(128));
      ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
    }

    ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
    ASSERT_OK_AND_ASSIGN(batch, scanner->Head(80000 * 10));
    //  ASSERT_OK_AND_ASSIGN(batch, scanner->Head(8000 * 10));
    // Random ReadAsync calls
    for (int yy = 0; yy < 100; yy++) {
      [[maybe_unused]] auto ff = buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001);
    }
    scanner = nullptr;
  }
}

@bkietz When we have too many Future, the done might notify too many "done", which might cause stack overflow...

@mapleFU
Copy link
Member Author

mapleFU commented Nov 14, 2023

class DelayedBufferReader : public ::arrow::io::BufferReader {
 public:
  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
      : ::arrow::io::BufferReader(buffer) {}

  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(const ::arrow::io::IOContext& io_context, int64_t position,
                                                     int64_t nbytes) override {
    read_async_count.fetch_add(1);
    auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
    return DeferNotOk(::arrow::io::internal::SubmitIO(
        io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> {
          std::this_thread::sleep_for(std::chrono::seconds(1));
          return self->DoReadAt(position, nbytes);
        }));
  }

  std::atomic<int> read_async_count{0};
};

TEST_F(TestParquetFileFormat, MultithreadedScanUnsafe) {
  auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);

  ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));

  std::vector<Future<>> completes;
  std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;

  for (int idx = 0; idx < 3; ++idx) {
    auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
    auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
    auto fragment = MakeFragment(*source);
    std::shared_ptr<Scanner> scanner;

    {
      auto options = std::make_shared<ScanOptions>();
      ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
      pools.emplace_back(thread_pool);
      options->io_context = ::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get());
      auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
      fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);

      options->fragment_scan_options = fragment_scan_options;
      options->use_threads = true;
      ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

      ASSERT_OK(builder.UseThreads(true));
      ASSERT_OK(builder.BatchSize(10000));
      ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
    }

    ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
    auto fut = scanner->ScanBatchesUnorderedAsync();
    //  ASSERT_OK_AND_ASSIGN(batch, scanner->Head(8000 * 10));
    // Random ReadAsync calls
    for (int yy = 0; yy < 16; yy++) {
      completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
    }
    scanner = nullptr;
  }

  for (auto& f: completes) {
    f.Wait();
  }
}

Finally I re-produce the issue successfully with same stack I meet, should I use this @bkietz ? (this test might runs for a long time...)

@mapleFU mapleFU requested a review from westonpace as a code owner November 14, 2023 15:51
@mapleFU mapleFU requested a review from bkietz November 14, 2023 15:51
std::vector<Future<>> completes;
std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;

for (int idx = 0; idx < 2; ++idx) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using 2 to make the state machine a bit complex, also make we use another Executor

options->use_threads = true;
ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);

ASSERT_OK(builder.UseThreads(true));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread is neccessary for testing

{
auto options = std::make_shared<ScanOptions>();
ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
pools.emplace_back(thread_pool);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool is for case mentioned in the description

@mapleFU
Copy link
Member Author

mapleFU commented Nov 14, 2023

This is ready for review now :-|

@mapleFU mapleFU force-pushed the dataset/fixing-async-bug branch from 3223950 to a93ea70 Compare November 15, 2023 02:46
@bkietz
Copy link
Member

bkietz commented Nov 15, 2023

Thank you for continuing to work on this, @mapleFU ! I'll review soon

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @mapleFU !

@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting change review Awaiting change review labels Nov 17, 2023
@bkietz bkietz merged commit 951d92a into apache:main Nov 17, 2023
33 of 34 checks passed
@bkietz bkietz removed the awaiting merge Awaiting merge label Nov 17, 2023
Copy link

After merging your PR, Conbench analyzed the 5 benchmarking runs that have been run so far on merge-commit 951d92a.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about 12 possible false positives for unstable benchmarks that are known to sometimes produce them.

@mapleFU mapleFU deleted the dataset/fixing-async-bug branch November 19, 2023 07:32
raulcd pushed a commit that referenced this pull request Nov 28, 2023
…et (#38466)

### Rationale for this change

Origin mentioned #38438

1. When PreBuffer is default enabled, the code in `RowGroupGenerator::FetchNext` would switch to async mode. This make the state handling more complex
2. In `RowGroupGenerator::FetchNext`, `[this]` is captured without `shared_from_this`. This is not bad, however, `this->executor_` may point to a invalid address if this dtor.

This patch also fixes a lifetime issue I founded in CSV handling.

### What changes are included in this PR?

1. Fix handling in `cpp/src/parquet/arrow/reader.cc` as I talked above
2. Fix a lifetime problem in CSV

### Are these changes tested?

I test it locality. But don't know how to write unittest here. Fell free to help.

### Are there any user-facing changes?

Bugfix

* Closes: #38438

Authored-by: mwish <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
dgreiss pushed a commit to dgreiss/arrow that referenced this pull request Feb 19, 2024
… dataset (apache#38466)

### Rationale for this change

Origin mentioned apache#38438

1. When PreBuffer is default enabled, the code in `RowGroupGenerator::FetchNext` would switch to async mode. This make the state handling more complex
2. In `RowGroupGenerator::FetchNext`, `[this]` is captured without `shared_from_this`. This is not bad, however, `this->executor_` may point to a invalid address if this dtor.

This patch also fixes a lifetime issue I founded in CSV handling.

### What changes are included in this PR?

1. Fix handling in `cpp/src/parquet/arrow/reader.cc` as I talked above
2. Fix a lifetime problem in CSV

### Are these changes tested?

I test it locality. But don't know how to write unittest here. Fell free to help.

### Are there any user-facing changes?

Bugfix

* Closes: apache#38438

Authored-by: mwish <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++][Python] Segfault during pyarrow.dataset.write_dataset with dataset source read with pre_buffer=True
4 participants