Skip to content

Commit

Permalink
Fix with threading disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Dec 16, 2024
1 parent 7573d11 commit 5774bf2
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
"been abandoned");
}

state_->task_queue.push_back(
Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
state_->task_queue.push(QueuedTask{std::move(task), std::move(stop_token),
std::move(stop_callback), hints.priority,
state_->spawned_tasks_count++});

return Status::OK();
}
Expand Down Expand Up @@ -328,8 +329,8 @@ bool SerialExecutor::RunTasksOnAllExecutors() {
if (exe->state_->paused == false && exe->state_->task_queue.empty() == false) {
SerialExecutor* old_exe = globalState->current_executor;
globalState->current_executor = exe;
Task task = std::move(exe->state_->task_queue.front());
exe->state_->task_queue.pop_front();
Task task = std::move(const_cast<Task&>(exe->state_->task_queue.top().task));
exe->state_->task_queue.pop();
run_task = true;
exe->state_->tasks_running += 1;
if (!task.stop_token.IsStopRequested()) {
Expand Down Expand Up @@ -363,8 +364,8 @@ void SerialExecutor::RunLoop() {
// we can't run any more until something else drops off the queue
if (state_->tasks_running <= state_->max_tasks_running) {
while (!state_->paused && !state_->task_queue.empty()) {
Task task = std::move(state_->task_queue.front());
state_->task_queue.pop_front();
Task task = std::move(const_cast<Task&>(state_->task_queue.top().task));
state_->task_queue.pop();
auto last_executor = globalState->current_executor;
globalState->current_executor = this;
state_->tasks_running += 1;
Expand Down Expand Up @@ -760,7 +761,8 @@ Status ThreadPool::Shutdown(bool wait) {
} else {
// clear any pending tasks so that we behave
// the same as threadpool on fast shutdown
state_->task_queue.clear();
std::priority_queue<QueuedTask> empty;
std::swap(state_->task_queue, empty);
}
return Status::OK();
}
Expand Down Expand Up @@ -800,7 +802,8 @@ Result<std::shared_ptr<ThreadPool>> ThreadPool::MakeEternal(int threads) {
ThreadPool::~ThreadPool() {
// clear threadpool, otherwise ~SerialExecutor will
// run any tasks left (which isn't threadpool behaviour)
state_->task_queue.clear();
std::priority_queue<QueuedTask> empty;
std::swap(state_->task_queue, empty);
}

#endif // ARROW_ENABLE_THREADING
Expand Down

0 comments on commit 5774bf2

Please sign in to comment.