Skip to content

Commit

Permalink
ThreadPool: get task status from the handler's return value in Enqueu…
Browse files Browse the repository at this point in the history
…eAsyncWork
  • Loading branch information
TheMostDiligent committed Oct 11, 2024
1 parent c0a31d2 commit 4f78d44
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 4 deletions.
8 changes: 7 additions & 1 deletion Common/interface/AsyncInitializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ class AsyncInitializer
{
VERIFY_EXPR(pThreadPool != nullptr);
return std::unique_ptr<AsyncInitializer>{
new AsyncInitializer{EnqueueAsyncWork(pThreadPool, ppPrerequisites, NumPrerequisites, std::forward<HanlderType>(Handler))},
new AsyncInitializer{
EnqueueAsyncWork(pThreadPool, ppPrerequisites, NumPrerequisites,
[Handler = std::forward<HanlderType>(Handler)](Uint32 ThreadId) mutable {
Handler(ThreadId);
return ASYNC_TASK_STATUS_COMPLETE;
}),
},
};
}

Expand Down
7 changes: 4 additions & 3 deletions Common/interface/ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class AsyncTaskBase : public ObjectBase<IAsyncTask>
break;

case ASYNC_TASK_STATUS_NOT_STARTED:
DEV_ERROR("NOT_STARTED is only allowed as initial task status.");
DEV_CHECK_ERR(m_TaskStatus == ASYNC_TASK_STATUS_RUNNING,
"A task should only be moved to NOT_STARTED state from RUNNING state.");
break;

case ASYNC_TASK_STATUS_RUNNING:
Expand Down Expand Up @@ -184,8 +185,8 @@ RefCntAutoPtr<IAsyncTask> EnqueueAsyncWork(IThreadPool* pThreadPool,

virtual void DILIGENT_CALL_TYPE Run(Uint32 ThreadId) override final
{
m_Handler(ThreadId);
SetStatus(ASYNC_TASK_STATUS_COMPLETE);
ASYNC_TASK_STATUS TaskStatus = m_Handler(ThreadId);
SetStatus(TaskStatus);
}

private:
Expand Down
35 changes: 35 additions & 0 deletions Tests/DiligentCoreTest/src/Common/ThreadPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ TEST(Common_ThreadPool, EnqueueTask)
f = std::sin(f + 1.f);
Results[i].store(f);
WorkComplete[i].store(true);

return ASYNC_TASK_STATUS_COMPLETE;
});
}

Expand Down Expand Up @@ -133,6 +135,8 @@ TEST(Common_ThreadPool, ProcessTask)
f = std::sin(f + 1.f);
Results[i].store(f);
WorkComplete[i].store(true);

return ASYNC_TASK_STATUS_COMPLETE;
});
}

Expand Down Expand Up @@ -326,6 +330,7 @@ TEST(Common_ThreadPool, Priorities)
[&CompletionOrder, i](Uint32 ThreadId) //
{
CompletionOrder.push_back(i);
return ASYNC_TASK_STATUS_COMPLETE;
});
}

Expand Down Expand Up @@ -396,6 +401,8 @@ TEST(Common_ThreadPool, Prerequisites)
}
if (CorrectOrder)
NumTasksCorrectlyOrdered.fetch_add(1);

return ASYNC_TASK_STATUS_COMPLETE;
},
static_cast<float>(task) // Inverse priority so that the thread pool fixes it
);
Expand All @@ -407,4 +414,32 @@ TEST(Common_ThreadPool, Prerequisites)
}
}


TEST(Common_ThreadPool, ReRunTasks)
{
auto pThreadPool = CreateThreadPool(ThreadPoolCreateInfo{4});
ASSERT_NE(pThreadPool, nullptr);

constexpr Uint32 NumTasks = 32;
std::vector<std::atomic<int>> ReRunCounters(NumTasks);

for (int i = 0; i < static_cast<int>(ReRunCounters.size()); ++i)
ReRunCounters[i] = 32 + i;

for (Uint32 task = 0; task < NumTasks; ++task)
{
EnqueueAsyncWork(
pThreadPool,
[task, &ReRunCounters](Uint32 ThreadId) //
{
int ReRunCounter = ReRunCounters[task].fetch_add(-1) - 1;
return ReRunCounter > 0 ? ASYNC_TASK_STATUS_NOT_STARTED : ASYNC_TASK_STATUS_COMPLETE;
});
}

pThreadPool->WaitForAllTasks();
for (size_t i = 0; i < ReRunCounters.size(); ++i)
EXPECT_EQ(ReRunCounters[i], 0) << i;
}

} // namespace

0 comments on commit 4f78d44

Please sign in to comment.