Skip to content

Commit

Permalink
Trying to release resource
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Nov 26, 2023
1 parent eb5de18 commit 4739797
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class Throttle {

private:
Future<> backpressure_ = Future<>::MakeFinished();
uint64_t max_value_;
const uint64_t max_value_;
uint64_t in_waiting_ = 0;
uint64_t current_value_ = 0;
std::mutex mutex_;
Expand Down Expand Up @@ -621,14 +621,29 @@ class DatasetWriter::DatasetWriterImpl {
backpressure = writer_state_.open_files_throttle.Acquire(1);
if (!backpressure.is_finished()) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
RETURN_NOT_OK(TryCloseLargestFile());
break;
}
}
RETURN_NOT_OK(dir_queue->StartWrite(next_chunk));
auto clean_up_back_pressure = [&]() {
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
if (will_open_file) {
writer_state_.open_files_throttle.Release(1);
}
};
auto s = dir_queue->StartWrite(next_chunk);
if (!s.ok()) {
clean_up_back_pressure();
return s;
}
batch = std::move(remainder);
if (batch) {
RETURN_NOT_OK(dir_queue->FinishCurrentFile());
s = dir_queue->FinishCurrentFile();
if (!s.ok()) {
clean_up_back_pressure();
return s;
}
}
}

Expand All @@ -647,6 +662,7 @@ class DatasetWriter::DatasetWriterImpl {
DatasetWriterState writer_state_;
std::function<void()> pause_callback_;
std::function<void()> resume_callback_;
// Map from directory + prefix to the queue for that directory
std::unordered_map<std::string, std::shared_ptr<DatasetWriterDirectoryQueue>>
directory_queues_;
std::mutex mutex_;
Expand Down

0 comments on commit 4739797

Please sign in to comment.