Skip to content

Commit

Permalink
Fix PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Dec 9, 2024
1 parent 1dba128 commit 0a3832f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
5 changes: 2 additions & 3 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ struct QueuedTask {
// urgently.
bool operator<(const QueuedTask& other) const {
if (priority == other.priority) {
// Maintain spawn order for tasks with the same priority. TODO: Decide if this is
// really needed. Currently several test cases in arrow-acero-hash-aggregate-test
// depend on it.
// Maintain execution order for tasks with the same priority. Its preferable to keep
// the execution order of tasks deterministic.
return spawn_index > other.spawn_index;
}
return priority > other.priority;
Expand Down
32 changes: 19 additions & 13 deletions cpp/src/arrow/util/thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#endif

#include <algorithm>
#include <condition_variable>
#include <cstdio>
#include <cstdlib>
#include <functional>
Expand Down Expand Up @@ -583,20 +584,25 @@ TEST_F(TestThreadPool, TasksRunInPriorityOrder) {
constexpr int kNumTasks = 10;
auto recorded_times = std::vector<std::chrono::steady_clock::time_point>(kNumTasks);
auto futures = std::vector<Future<int>>(kNumTasks);
auto sleep_task = []() { SleepABit(); };
std::mutex mutex;

// Spawn a sleep task to block the pool while we add the other tasks. This
// ensures all the tasks are queued before any of them start running, so that
// their running order is fully determined by their priority.
ASSERT_OK(pool->Spawn(sleep_task));

for (int i = 0; i < kNumTasks; ++i) {
auto record_time = [&recorded_times, i]() {
recorded_times[i] = std::chrono::steady_clock::now();
return i;
};
// Spawn tasks in opposite order to urgency.
ASSERT_OK_AND_ASSIGN(futures[i], pool->Submit(TaskHints{kNumTasks - i}, record_time));
auto wait_task = [&mutex] { std::unique_lock<std::mutex> lock(mutex); };
{
std::unique_lock<std::mutex> lock(mutex);
// Spawn wait_task to block the pool while we add the other tasks. This
// ensures all the tasks are queued before any of them start running, so that
// their running order is fully determined by their priority.
ASSERT_OK(pool->Spawn(wait_task));

for (int i = 0; i < kNumTasks; ++i) {
auto record_time = [&recorded_times, i]() {
recorded_times[i] = std::chrono::steady_clock::now();
return i;
};
// Spawn tasks in opposite order to urgency.
ASSERT_OK_AND_ASSIGN(futures[i],
pool->Submit(TaskHints{kNumTasks - i}, record_time));
}
}

ASSERT_OK(pool->Shutdown());
Expand Down

0 comments on commit 0a3832f

Please sign in to comment.