diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index a2096d691b4bf..e765fc2a25538 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -87,7 +87,7 @@ class Throttle { private: Future<> backpressure_ = Future<>::MakeFinished(); - uint64_t max_value_; + const uint64_t max_value_; uint64_t in_waiting_ = 0; uint64_t current_value_ = 0; std::mutex mutex_; @@ -621,14 +621,29 @@ class DatasetWriter::DatasetWriterImpl { backpressure = writer_state_.open_files_throttle.Acquire(1); if (!backpressure.is_finished()) { EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); + writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); RETURN_NOT_OK(TryCloseLargestFile()); break; } } - RETURN_NOT_OK(dir_queue->StartWrite(next_chunk)); + auto clean_up_back_pressure = [&]() { + writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); + if (will_open_file) { + writer_state_.open_files_throttle.Release(1); + } + }; + auto s = dir_queue->StartWrite(next_chunk); + if (!s.ok()) { + clean_up_back_pressure(); + return s; + } batch = std::move(remainder); if (batch) { - RETURN_NOT_OK(dir_queue->FinishCurrentFile()); + s = dir_queue->FinishCurrentFile(); + if (!s.ok()) { + clean_up_back_pressure(); + return s; + } } } @@ -647,6 +662,7 @@ class DatasetWriter::DatasetWriterImpl { DatasetWriterState writer_state_; std::function pause_callback_; std::function resume_callback_; + // Map from directory + prefix to the queue for that directory std::unordered_map> directory_queues_; std::mutex mutex_;