Skip to content

Commit

Permalink
Merge pull request #1071 from dlyr/fix-tasks
Browse files Browse the repository at this point in the history
Fix tasks
  • Loading branch information
dlyr authored Oct 13, 2023
2 parents dd30e67 + a1f6aae commit 2e09817
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 76 deletions.
145 changes: 83 additions & 62 deletions src/Core/Tasks/TaskQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,26 @@ namespace Ra {
namespace Core {

TaskQueue::TaskQueue( uint numThreads ) : m_processingTasks( 0 ), m_shuttingDown( false ) {
wlock lock( m_mutex );
m_workerThreads.reserve( numThreads );
for ( uint i = 0; i < numThreads; ++i ) {
m_workerThreads.emplace_back( &TaskQueue::runThread, this, i );
}
}

TaskQueue::~TaskQueue() {
{
wlock lock( m_mutex );
m_shuttingDown = true;
}
flushTaskQueue();
m_shuttingDown = true;
m_threadNotifier.notify_all();
for ( auto& t : m_workerThreads ) {
t.join();
}
}

TaskQueue::TaskId TaskQueue::registerTask( std::unique_ptr<Task> task ) {
std::lock_guard<std::mutex> lock( m_taskMutex );
wlock lock( m_mutex );
TimerData tdata;
// init tdata with task name before moving ownership
tdata.taskName = task->getName();
Expand All @@ -51,7 +54,7 @@ void TaskQueue::removeTask( TaskId taskId ) {
<< m_tasks.size();
return;
}
std::lock_guard<std::mutex> lock( m_taskMutex );
wlock lock( m_mutex );

// set task as dummy noop
m_tasks[taskId] = std::make_unique<FunctionTask>( []() {}, m_timerData[taskId].taskName );
Expand All @@ -62,6 +65,7 @@ void TaskQueue::removeTask( TaskId taskId ) {
}

TaskQueue::TaskId TaskQueue::getTaskId( const std::string& taskName ) const {
rlock lock( m_mutex );
auto itr = std::find_if( m_tasks.begin(), m_tasks.end(), [taskName]( const auto& task ) {
return task->getName() == taskName;
} );
Expand All @@ -71,7 +75,7 @@ TaskQueue::TaskId TaskQueue::getTaskId( const std::string& taskName ) const {
}

void TaskQueue::addDependency( TaskQueue::TaskId predecessor, TaskQueue::TaskId successor ) {
std::lock_guard<std::mutex> lock( m_taskMutex );
wlock lock( m_mutex );

CORE_ASSERT( predecessor.isValid() && ( predecessor < m_tasks.size() ),
"Invalid predecessor task" );
Expand Down Expand Up @@ -112,12 +116,12 @@ bool TaskQueue::addDependency( TaskQueue::TaskId predecessor, const std::string&

void TaskQueue::addPendingDependency( const std::string& predecessors,
TaskQueue::TaskId successor ) {
std::lock_guard<std::mutex> lock( m_taskMutex );
wlock lock( m_mutex );
m_pendingDepsSucc.emplace_back( predecessors, successor );
}

void TaskQueue::addPendingDependency( TaskId predecessor, const std::string& successors ) {
std::lock_guard<std::mutex> lock( m_taskMutex );
wlock lock( m_mutex );
m_pendingDepsPre.emplace_back( predecessor, successors );
}

Expand All @@ -134,9 +138,11 @@ void TaskQueue::resolveDependencies() {
"Pending dependency unresolved : (" << pre.first << ") -> "
<< m_tasks[pre.second]->getName() );
}
std::lock_guard<std::mutex> lock( m_taskMutex );
m_pendingDepsPre.clear();
m_pendingDepsSucc.clear();
{
wlock lock( m_mutex );
m_pendingDepsPre.clear();
m_pendingDepsSucc.clear();
}
}

// queueTask is always called with m_taskQueueMutex locked
Expand All @@ -152,7 +158,7 @@ void TaskQueue::detectCycles() {
// Do a depth-first search of the nodes.
std::vector<bool> visited( m_tasks.size(), false );
std::stack<TaskId> pending;

rlock lock( m_mutex );
for ( uint id = 0; id < m_tasks.size(); ++id ) {
if ( m_dependencies[id].size() == 0 ) { pending.push( TaskId( id ) ); }
}
Expand All @@ -178,82 +184,91 @@ void TaskQueue::detectCycles() {

void TaskQueue::startTasks() {
using namespace Ra::Core::Utils;
if ( m_workerThreads.empty() ) {
LOG( logERROR ) << "TaskQueue as 0 threads, could not start tasks in parallel. Either "
"create a task queue with more threads, or use runTasksInThisThread";
return;
{
rlock lock( m_mutex );
if ( m_workerThreads.empty() ) {
LOG( logERROR ) << "TaskQueue as 0 threads, could not start tasks in parallel. Either "
"create a task queue with more threads, or use runTasksInThisThread";
return;
}
}

// Add pending dependencies.
resolveDependencies();

// Do a debug check
detectCycles();

// Enqueue all tasks with no dependencies.
for ( uint t = 0; t < m_tasks.size(); ++t ) {
// only queue non null m_tasks
if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask( TaskId { t } ); }
}
{
wlock lock( m_mutex );
for ( uint t = 0; t < m_tasks.size(); ++t ) {
// only queue non null m_tasks

if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask( TaskId { t } ); }
}
}
// Wake up all threads.
m_threadNotifier.notify_all();
}

void TaskQueue::runTasksInThisThread() {
// this method should not be called between startTasks/waitForTasks, we do not lock anything
// here.

// lock task queue so no other worker can start working while this thread do the job.
std::lock_guard<std::mutex> lock( m_taskQueueMutex );
// use local task queue, preventing workers to pickup jobs.
std::deque<TaskId> taskQueue;

// Add pending dependencies.
resolveDependencies();

// Do a debug check
detectCycles();

// Enqueue all tasks with no dependencies.
for ( uint t = 0; t < m_tasks.size(); ++t ) {
// only queue non null m_tasks
if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) { queueTask( TaskId { t } ); }
{
// Enqueue all tasks with no dependencies.
for ( uint t = 0; t < m_tasks.size(); ++t ) {
// only queue non null m_tasks
if ( m_tasks[t] && m_remainingDependencies[t] == 0 ) {
taskQueue.push_front( TaskId { t } );
}
}
}
while ( !m_taskQueue.empty() ) {
TaskId task { m_taskQueue.back() };
m_taskQueue.pop_back();
{
while ( !taskQueue.empty() ) {
TaskId task;
{
task = taskQueue.back();
taskQueue.pop_back();
}
// Run task
m_timerData[task].start = Utils::Clock::now();
m_timerData[task].threadId = 0;
m_tasks[task]->process();
m_timerData[task].end = Utils::Clock::now();

// Run task
m_timerData[task].start = Utils::Clock::now();
m_timerData[task].threadId = 0;
m_tasks[task]->process();
m_timerData[task].end = Utils::Clock::now();

for ( auto t : m_dependencies[task] ) {
uint& nDepends = m_remainingDependencies[t];
CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
--nDepends;
if ( nDepends == 0 ) { queueTask( t ); }
for ( auto t : m_dependencies[task] ) {
uint& nDepends = m_remainingDependencies[t];
CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
--nDepends;
if ( nDepends == 0 ) { taskQueue.push_front( TaskId { t } ); }
}
}
}
flushTaskQueue();
}

void TaskQueue::waitForTasks() {
bool isFinished = false;
while ( !isFinished ) {
// TODO : use a notifier for task queue empty.
{
std::lock_guard<std::mutex> lock( m_taskQueueMutex );
isFinished = ( m_taskQueue.empty() && m_processingTasks == 0 );
}
if ( !isFinished ) { std::this_thread::yield(); }
}
rlock lock( m_mutex );
m_waitForTasksNotifier.wait(
lock, [this]() { return ( m_taskQueue.empty() && m_processingTasks == 0 ); } );
}

const std::vector<TaskQueue::TimerData>& TaskQueue::getTimerData() {
return m_timerData;
}

void TaskQueue::flushTaskQueue() {
std::lock_guard<std::mutex> lock( m_taskMutex );
m_threadNotifier.notify_all();
wlock lock( m_mutex );

CORE_ASSERT( m_processingTasks == 0, "You have tasks still in process" );
CORE_ASSERT( m_taskQueue.empty(), " You have unprocessed tasks " );
Expand All @@ -270,8 +285,7 @@ void TaskQueue::runThread( uint id ) {

// Acquire mutex.
{
std::unique_lock<std::mutex> lock( m_taskQueueMutex );

wlock lock( m_mutex );
// Wait for a new task
m_threadNotifier.wait( lock,
[this]() { return m_shuttingDown || !m_taskQueue.empty(); } );
Expand All @@ -288,15 +302,22 @@ void TaskQueue::runThread( uint id ) {
// Release mutex.

// Run task
m_timerData[task].start = Utils::Clock::now();
m_timerData[task].threadId = id;
m_tasks[task]->process();
m_timerData[task].end = Utils::Clock::now();

{
{
rlock lock( m_mutex );
m_timerData[task].start = Utils::Clock::now();
m_timerData[task].threadId = id;
}
m_tasks[task]->process();
{
rlock lock( m_mutex );
m_timerData[task].end = Utils::Clock::now();
}
}
// Critical section : mark task as finished and en-queue dependencies.
uint newTasks = 0;
{
std::unique_lock<std::mutex> lock( m_taskQueueMutex );
wlock lock( m_mutex );
for ( auto t : m_dependencies[task] ) {
uint& nDepends = m_remainingDependencies[t];
CORE_ASSERT( nDepends > 0, "Inconsistency in dependencies" );
Expand All @@ -305,12 +326,12 @@ void TaskQueue::runThread( uint id ) {
queueTask( t );
++newTasks;
}
// TODO :Easy optimization : grab one of the new task and process it immediately.
}
--m_processingTasks;
if ( m_processingTasks == 0 ) { m_waitForTasksNotifier.notify_one(); }
}
// If we added new tasks, we wake up one thread to execute it.
if ( newTasks > 0 ) { m_threadNotifier.notify_one(); }
// If we added new tasks, we wake up threads to execute thems.
if ( newTasks > 0 ) { m_threadNotifier.notify_all(); }
} // End of while(true)
}

Expand Down
51 changes: 37 additions & 14 deletions src/Core/Tasks/TaskQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#include <Core/Utils/Index.hpp>
#include <Core/Utils/Timer.hpp> // Ra::Core::TimePoint

#include <atomic>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <thread>
#include <vector>
Expand All @@ -20,13 +22,27 @@ class Task;

namespace Ra {
namespace Core {
/** This class allows tasks to be registered and then executed in parallel on separate threads.
* it maintains an internal pool of threads. When instructed, it dispatches the tasks to the
/** @brief This class allows tasks to be registered and then executed in parallel on separate
* threads.
*
* It maintains an internal pool of threads. When instructed, it dispatches the tasks to the
* pooled threads.
* Task are allowed to have dependencies. A task will be executed only when all its dependencies
* are satisfied, i.e. all dependant tasks are finished.
* Note that most functions are not thread safe and must not be called when the task queue is
* running.
* Typical usage:
\code
TaskQueue taskQueue( 4 );
auto task = std::make_unique<FunctionTask>( ... );
auto tid = taskQueue.registerTask( std::move( task ) );
// [...]
taskQueue.addDependency( tid, ... );
// [...]
taskQueue.startTasks();
taskQueue.waitForTasks();
taskQueue.flushTaskQueue();
\endcode
*/
class RA_CORE_API TaskQueue
{
Expand Down Expand Up @@ -84,14 +100,16 @@ class RA_CORE_API TaskQueue
// Task queue operations
//

/// Launches the execution of all the threads in the task queue.
/// Launches the execution of all the tasks in the task queue.
/// No more tasks should be added at this point.
void startTasks();

/// Launches the execution of all task in the thread of the caller.
/// Return when all tasks are done. Usefull for instance for opengl related tasks that must run
/// in the context thread.
/// Once tasks are all processed, this method call flushTasksQueue.
/// @warning use either this method, either a startTasks/waitForTasks. calling
/// runTasksInThisThead between startTasks/waitForTasks calls may produce unexpected results.
void runTasksInThisThread();

/// Blocks until all tasks and dependencies are finished.
Expand Down Expand Up @@ -122,8 +140,18 @@ class RA_CORE_API TaskQueue
void resolveDependencies();

private:
/// write lock, only one at a time
using wlock = std::unique_lock<std::shared_mutex>;
/// read lock, multiple lock allowed
using rlock = std::shared_lock<std::shared_mutex>;

/// Threads working on tasks.
std::vector<std::thread> m_workerThreads;

//
// mutex protected variables.
//

/// Storage for the tasks (task will be deleted after flushQueue()).
std::vector<std::unique_ptr<Task>> m_tasks;
/// For each task, stores which tasks depend on it.
Expand All @@ -136,10 +164,6 @@ class RA_CORE_API TaskQueue
/// Stores the timings of each frame after execution.
std::vector<TimerData> m_timerData;

//
// mutex protected variables.
//

/// Number of tasks each task is waiting on.
std::vector<uint> m_remainingDependencies;
/// Queue holding the pending tasks.
Expand All @@ -148,14 +172,13 @@ class RA_CORE_API TaskQueue
uint m_processingTasks;

/// Flag to signal threads to quit.
bool m_shuttingDown;
std::atomic_bool m_shuttingDown;
/// Variable on which threads wait for new tasks.
std::condition_variable m_threadNotifier;
/// Global mutex over thread-sensitive variables.
std::mutex m_taskQueueMutex;
/// Mutex for task registration (m_tasks, m_dependencies, m_timerData ...), if tasks are
/// registered from multiple threads
std::mutex m_taskMutex;
std::condition_variable_any m_threadNotifier;
std::condition_variable_any m_waitForTasksNotifier;

/// mutex for protected variable
mutable std::shared_mutex m_mutex;
};

} // namespace Core
Expand Down
Loading

0 comments on commit 2e09817

Please sign in to comment.