diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 73a94619808e3..a2c8cd8641bb3 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -208,8 +208,9 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, "been abandoned"); } - state_->task_queue.push_back( - Task{std::move(task), std::move(stop_token), std::move(stop_callback)}); + state_->task_queue.push(QueuedTask{std::move(task), std::move(stop_token), + std::move(stop_callback), hints.priority, + state_->spawned_tasks_count++}); return Status::OK(); } @@ -328,8 +329,8 @@ bool SerialExecutor::RunTasksOnAllExecutors() { if (exe->state_->paused == false && exe->state_->task_queue.empty() == false) { SerialExecutor* old_exe = globalState->current_executor; globalState->current_executor = exe; - Task task = std::move(exe->state_->task_queue.front()); - exe->state_->task_queue.pop_front(); + Task task = std::move(const_cast(exe->state_->task_queue.top().task)); + exe->state_->task_queue.pop(); run_task = true; exe->state_->tasks_running += 1; if (!task.stop_token.IsStopRequested()) { @@ -363,8 +364,8 @@ void SerialExecutor::RunLoop() { // we can't run any more until something else drops off the queue if (state_->tasks_running <= state_->max_tasks_running) { while (!state_->paused && !state_->task_queue.empty()) { - Task task = std::move(state_->task_queue.front()); - state_->task_queue.pop_front(); + Task task = std::move(const_cast(state_->task_queue.top().task)); + state_->task_queue.pop(); auto last_executor = globalState->current_executor; globalState->current_executor = this; state_->tasks_running += 1; @@ -760,7 +761,8 @@ Status ThreadPool::Shutdown(bool wait) { } else { // clear any pending tasks so that we behave // the same as threadpool on fast shutdown - state_->task_queue.clear(); + std::priority_queue empty; + std::swap(state_->task_queue, empty); } return Status::OK(); } @@ -800,7 +802,8 @@ Result> ThreadPool::MakeEternal(int threads) { ThreadPool::~ThreadPool() { // clear threadpool, otherwise ~SerialExecutor will // run any tasks left (which isn't threadpool behaviour) - state_->task_queue.clear(); + std::priority_queue empty; + std::swap(state_->task_queue, empty); } #endif // ARROW_ENABLE_THREADING