Skip to content

Commit

Permalink
tasks without priorities
Browse files Browse the repository at this point in the history
  • Loading branch information
malytomas committed Jan 25, 2024
1 parent 5534513 commit 586b6e7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 143 deletions.
1 change: 0 additions & 1 deletion sources/include/cage-core/assetManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ namespace cage
uint32 diskLoadingThreads = 2;
uint32 customProcessingThreads = 5;
uint32 schemesMaxCount = 100; // 0..49 for engine and 50..99 for the game
sint32 tasksPriority = 0;
};

CAGE_CORE_API Holder<AssetManager> newAssetManager(const AssetManagerCreateConfig &config);
Expand Down
53 changes: 22 additions & 31 deletions sources/include/cage-core/tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace cage
TaskRunner runner = nullptr;
uint32 elements = 0;
uint32 invocations = 0;
sint32 priority = 0;
};

template<uint32 Aggregation>
Expand Down Expand Up @@ -74,7 +73,7 @@ namespace cage
}

template<class T, uint32 Aggregation, bool Async>
auto tasksRunImpl(StringPointer name, Delegate<void(T &)> function, Holder<PointerRange<T>> data, sint32 priority)
auto tasksRunImpl(StringPointer name, Delegate<void(T &)> function, Holder<PointerRange<T>> data)
{
static_assert(sizeof(TaskCreateConfig::function) == sizeof(function));
TaskCreateConfig tsk;
Expand All @@ -90,13 +89,12 @@ namespace cage
};
tsk.elements = numeric_cast<uint32>(data->size());
tsk.invocations = aggInvocations<Aggregation>(tsk.elements);
tsk.priority = priority;
tsk.data = std::move(data).template cast<void>();
return tasksRunImpl<Async>(std::move(tsk));
}

template<class T, uint32 Aggregation, bool Async>
auto tasksRunImpl(StringPointer name, Holder<PointerRange<T>> data, sint32 priority)
auto tasksRunImpl(StringPointer name, Holder<PointerRange<T>> data)
{
TaskCreateConfig tsk;
tsk.name = name;
Expand All @@ -109,13 +107,12 @@ namespace cage
};
tsk.elements = numeric_cast<uint32>(data->size());
tsk.invocations = aggInvocations<Aggregation>(tsk.elements);
tsk.priority = priority;
tsk.data = std::move(data).template cast<void>();
return tasksRunImpl<Async>(std::move(tsk));
}

template<class T, bool Async>
auto tasksRunImpl(StringPointer name, Delegate<void(T &, uint32)> function, Holder<T> data, uint32 invocations, sint32 priority)
auto tasksRunImpl(StringPointer name, Delegate<void(T &, uint32)> function, Holder<T> data, uint32 invocations)
{
static_assert(sizeof(privat::TaskCreateConfig::function) == sizeof(function));
TaskCreateConfig tsk;
Expand All @@ -128,13 +125,12 @@ namespace cage
function(*data, idx);
};
tsk.invocations = invocations;
tsk.priority = priority;
tsk.data = std::move(data).template cast<void>();
return tasksRunImpl<Async>(std::move(tsk));
}

template<class T, bool Async>
auto tasksRunImpl(StringPointer name, Holder<T> data, uint32 invocations, sint32 priority)
auto tasksRunImpl(StringPointer name, Holder<T> data, uint32 invocations)
{
TaskCreateConfig tsk;
tsk.name = name;
Expand All @@ -144,67 +140,62 @@ namespace cage
(*data)(idx);
};
tsk.invocations = invocations;
tsk.priority = priority;
tsk.data = std::move(data).template cast<void>();
return tasksRunImpl<Async>(std::move(tsk));
}
}

// allows running higher priority tasks from inside long running task
CAGE_CORE_API void tasksYield();
CAGE_CORE_API sint32 tasksCurrentPriority();

// invoke the function once for each element of the range
template<class T, uint32 Aggregation = 1>
CAGE_FORCE_INLINE void tasksRunBlocking(StringPointer name, Delegate<void(T &)> function, PointerRange<T> data, sint32 priority = tasksCurrentPriority())
CAGE_FORCE_INLINE void tasksRunBlocking(StringPointer name, Delegate<void(T &)> function, PointerRange<T> data)
{
return privat::tasksRunImpl<T, Aggregation, false>(name, function, Holder<PointerRange<T>>(&data, nullptr), priority);
return privat::tasksRunImpl<T, Aggregation, false>(name, function, Holder<PointerRange<T>>(&data, nullptr));
}
template<class T, uint32 Aggregation = 1>
CAGE_FORCE_INLINE Holder<AsyncTask> tasksRunAsync(StringPointer name, Delegate<void(T &)> function, Holder<PointerRange<T>> data, sint32 priority = tasksCurrentPriority())
CAGE_FORCE_INLINE Holder<AsyncTask> tasksRunAsync(StringPointer name, Delegate<void(T &)> function, Holder<PointerRange<T>> data)
{
return privat::tasksRunImpl<T, Aggregation, true>(name, function, std::move(data), priority);
return privat::tasksRunImpl<T, Aggregation, true>(name, function, std::move(data));
}

// invoke operator()() once for each element of the range
template<class T, uint32 Aggregation = 1>
CAGE_FORCE_INLINE void tasksRunBlocking(StringPointer name, PointerRange<T> data, sint32 priority = tasksCurrentPriority())
CAGE_FORCE_INLINE void tasksRunBlocking(StringPointer name, PointerRange<T> data)
{
return privat::tasksRunImpl<T, Aggregation, false>(name, Holder<PointerRange<T>>(&data, nullptr), priority);
return privat::tasksRunImpl<T, Aggregation, false>(name, Holder<PointerRange<T>>(&data, nullptr));
}
template<class T, uint32 Aggregation = 1>
CAGE_FORCE_INLINE Holder<AsyncTask> tasksRunAsync(StringPointer name, Holder<PointerRange<T>> data, sint32 priority = tasksCurrentPriority())
CAGE_FORCE_INLINE Holder<AsyncTask> tasksRunAsync(StringPointer name, Holder<PointerRange<T>> data)
{
return privat::tasksRunImpl<T, Aggregation, true>(name, std::move(data), priority);
return privat::tasksRunImpl<T, Aggregation, true>(name, std::move(data));
}

// invoke the function invocations time, each time with the same data
template<class T>
CAGE_FORCE_INLINE void tasksRunBlocking(StringPointer name, Delegate<void(T &, uint32)> function, T &data, uint32 invocations, sint32 priority = tasksCurrentPriority())
CAGE_FORCE_INLINE void tasksRunBlocking(StringPointer name, Delegate<void(T &, uint32)> function, T &data, uint32 invocations)
{
return privat::tasksRunImpl<T, false>(name, function, Holder<T>(&data, nullptr), invocations, priority);
return privat::tasksRunImpl<T, false>(name, function, Holder<T>(&data, nullptr), invocations);
}
template<class T>
CAGE_FORCE_INLINE Holder<AsyncTask> tasksRunAsync(StringPointer name, Delegate<void(T &, uint32)> function, Holder<T> data, uint32 invocations = 1, sint32 priority = tasksCurrentPriority())
CAGE_FORCE_INLINE Holder<AsyncTask> tasksRunAsync(StringPointer name, Delegate<void(T &, uint32)> function, Holder<T> data, uint32 invocations = 1)
{
return privat::tasksRunImpl<T, true>(name, function, std::move(data), invocations, priority);
return privat::tasksRunImpl<T, true>(name, function, std::move(data), invocations);
}

// invoke operator()(uint32) invocations time, each time with the same data
template<class T>
CAGE_FORCE_INLINE void tasksRunBlocking(StringPointer name, T &data, uint32 invocations, sint32 priority = tasksCurrentPriority())
CAGE_FORCE_INLINE void tasksRunBlocking(StringPointer name, T &data, uint32 invocations)
{
return privat::tasksRunImpl<T, false>(name, Holder<T>(&data, nullptr), invocations, priority);
return privat::tasksRunImpl<T, false>(name, Holder<T>(&data, nullptr), invocations);
}
template<class T>
CAGE_FORCE_INLINE Holder<AsyncTask> tasksRunAsync(StringPointer name, Holder<T> data, uint32 invocations = 1, sint32 priority = tasksCurrentPriority())
CAGE_FORCE_INLINE Holder<AsyncTask> tasksRunAsync(StringPointer name, Holder<T> data, uint32 invocations = 1)
{
return privat::tasksRunImpl<T, true>(name, std::move(data), invocations, priority);
return privat::tasksRunImpl<T, true>(name, std::move(data), invocations);
}

// invoke the function invocations time
CAGE_CORE_API void tasksRunBlocking(StringPointer name, Delegate<void(uint32)> function, uint32 invocations, sint32 priority = tasksCurrentPriority());
CAGE_CORE_API Holder<AsyncTask> tasksRunAsync(StringPointer name, Delegate<void(uint32)> function, uint32 invocations = 1, sint32 priority = tasksCurrentPriority());
CAGE_CORE_API void tasksRunBlocking(StringPointer name, Delegate<void(uint32)> function, uint32 invocations);
CAGE_CORE_API Holder<AsyncTask> tasksRunAsync(StringPointer name, Delegate<void(uint32)> function, uint32 invocations = 1);

// divide tasks into groups - find begin/end indices for a particular group
CAGE_CORE_API std::pair<uint32, uint32> tasksSplit(uint32 groupIndex, uint32 groupsCount, uint32 tasksCount);
Expand Down
9 changes: 4 additions & 5 deletions sources/libcore/assets/assetManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ namespace cage
{
public:
const String path;
const sint32 tasksPriority = 0;
Holder<Mutex> privateMutex = newMutex(); // protects generateName, privateIndex
Holder<RwMutex> publicMutex = newRwMutex(); // protects publicIndex, waitingIndex
KeepOpen keepOpen;
Expand Down Expand Up @@ -200,7 +199,7 @@ namespace cage
}
}

AssetManagerImpl(const AssetManagerCreateConfig &config) : path(findAssetsFolderPath(config)), tasksPriority(config.tasksPriority)
AssetManagerImpl(const AssetManagerCreateConfig &config) : path(findAssetsFolderPath(config))
{
CAGE_LOG(SeverityEnum::Info, "assetManager", Stringizer() + "using assets path: '" + path + "'");
schemes.resize(config.schemesMaxCount);
Expand Down Expand Up @@ -248,7 +247,7 @@ namespace cage
return;
Holder<TskDecompress> tsk = systemMemory().createHolder<TskDecompress>();
tsk->loading = std::move(loading);
tasksRunAsync<TskDecompress>("asset decompress", std::move(tsk), 1, tasksPriority);
tasksRunAsync<TskDecompress>("asset decompress", std::move(tsk));
}

void enqueueProcess(Holder<Loading> &&loading)
Expand All @@ -261,7 +260,7 @@ namespace cage
return;
const uint32 thr = schemes[scheme].threadIndex;
if (thr == m)
tasksRunAsync<Loading>("asset loading process", std::move(loading), 1, tasksPriority);
tasksRunAsync<Loading>("asset loading process", std::move(loading));
else
customProcessingQueues[thr]->push(std::move(loading).cast<CustomProcessing>());
}
Expand All @@ -272,7 +271,7 @@ namespace cage
return;
const uint32 thr = schemes[unloading->asset->scheme].threadIndex;
if (thr == m)
tasksRunAsync<Unloading>("asset unloading process", std::move(unloading), 1, tasksPriority);
tasksRunAsync<Unloading>("asset unloading process", std::move(unloading));
else
customProcessingQueues[thr]->push(std::move(unloading).cast<CustomProcessing>());
}
Expand Down
70 changes: 13 additions & 57 deletions sources/libcore/concurrent/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,11 @@ namespace cage
{
struct ThreadData
{
sint32 currentPriority = std::numeric_limits<sint32>().min();
bool runningTask = false;
bool taskingThread = false;
};

thread_local ThreadData threadData;

struct PrioritySwapper : private Immovable
{
sint32 origPriority = m;
bool origTask = true;

explicit PrioritySwapper(sint32 taskPriority) : origPriority(taskPriority)
{
ThreadData &thr = threadData;
std::swap(thr.currentPriority, origPriority);
std::swap(thr.runningTask, origTask);
}

~PrioritySwapper()
{
ThreadData &thr = threadData;
thr.currentPriority = origPriority;
thr.runningTask = origTask;
}
};

struct TaskImpl : public AsyncTask
{
const privat::TaskCreateConfig config;
Expand All @@ -62,10 +40,9 @@ namespace cage
if (completed)
return;
ProfilingScope profiling(config.name);
PrioritySwapper prioritySwapper(config.priority);
const uint32 idx = executing++;
CAGE_ASSERT(idx < config.invocations);
profiling.set(Stringizer() + "task invocation: " + idx + " / " + config.invocations + ", priority: " + config.priority);
profiling.set(Stringizer() + "task invocation: " + idx + " / " + config.invocations);
try
{
config.runner(config, idx);
Expand Down Expand Up @@ -111,20 +88,18 @@ namespace cage

struct TasksQueue : public ConcurrentQueue<Holder<TaskImpl>>
{
bool tryPopFilter(Holder<TaskImpl> &value, sint32 requiredPriority)
bool tryPopFilter(Holder<TaskImpl> &value)
{
ScopeLock sl(mut);
if (stop)
CAGE_THROW_SILENT(ConcurrentQueueTerminated, "concurrent queue terminated");
for (auto it = items.begin(); it != items.end(); it++)
{
if ((*it)->config.priority >= requiredPriority)
{
value = std::move(*it);
items.erase(it);
writer->signal();
return true;
}
// todo add filters here
value = std::move(*it);
items.erase(it);
writer->signal();
return true;
}
return false;
}
Expand Down Expand Up @@ -153,7 +128,7 @@ namespace cage
while (!completed)
{
Holder<TaskImpl> t;
if (q.tryPopFilter(t, config.priority)) // allow same priority
if (q.tryPopFilter(t))
{
enqueue(t.share());
t->execute();
Expand Down Expand Up @@ -266,7 +241,7 @@ namespace cage
}

template<bool Async>
auto tasksRunImpl(StringPointer name, Delegate<void(uint32)> function, uint32 invocations, sint32 priority)
auto tasksRunImpl(StringPointer name, Delegate<void(uint32)> function, uint32 invocations)
{
static_assert(sizeof(TaskCreateConfig::function) == sizeof(function));
TaskCreateConfig tsk;
Expand All @@ -278,37 +253,18 @@ namespace cage
function(idx);
};
tsk.invocations = invocations;
tsk.priority = priority;
return tasksRunImpl<Async>(std::move(tsk));
}
}

void tasksYield()
{
const sint32 requiredPriority = threadData.currentPriority + 1; // requires strictly higher priority
while (true)
{
Holder<TaskImpl> t;
if (!queue().tryPopFilter(t, requiredPriority))
break;
enqueue(t.share());
t->execute();
}
}

sint32 tasksCurrentPriority()
{
return threadData.runningTask ? threadData.currentPriority : 0;
}

void tasksRunBlocking(StringPointer name, Delegate<void(uint32)> function, uint32 invocations, sint32 priority)
void tasksRunBlocking(StringPointer name, Delegate<void(uint32)> function, uint32 invocations)
{
return privat::tasksRunImpl<false>(name, function, invocations, priority);
return privat::tasksRunImpl<false>(name, function, invocations);
}

Holder<AsyncTask> tasksRunAsync(StringPointer name, Delegate<void(uint32)> function, uint32 invocations, sint32 priority)
Holder<AsyncTask> tasksRunAsync(StringPointer name, Delegate<void(uint32)> function, uint32 invocations)
{
return privat::tasksRunImpl<true>(name, function, invocations, priority);
return privat::tasksRunImpl<true>(name, function, invocations);
}

std::pair<uint32, uint32> tasksSplit(uint32 groupIndex, uint32 groupsCount, uint32 tasksCount)
Expand Down
4 changes: 2 additions & 2 deletions sources/libengine/graphics/renderPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ namespace cage
uni.iparams[0]++; // shadowed light type
}

return tasksRunAsync<ShadowmapData>("render shadowmap task", Delegate<void(ShadowmapData &, uint32)>().bind<RenderPipelineImpl, &RenderPipelineImpl::taskShadowmap>(this), Holder<ShadowmapData>(&data, nullptr), 1, tasksCurrentPriority() + 9);
return tasksRunAsync<ShadowmapData>("render shadowmap task", Delegate<void(ShadowmapData &, uint32)>().bind<RenderPipelineImpl, &RenderPipelineImpl::taskShadowmap>(this), Holder<ShadowmapData>(&data, nullptr));
}

RenderPipelineResult prepareCamera(const RenderPipelineCamera &camera) const
Expand All @@ -1310,7 +1310,7 @@ namespace cage
tasks.push_back(prepareShadowmap(data, e, lc, sc));
},
+scene, false);
tasks.push_back(tasksRunAsync<CameraData>("render camera task", Delegate<void(CameraData &, uint32)>().bind<RenderPipelineImpl, &RenderPipelineImpl::taskCamera>(this), Holder<CameraData>(&data, nullptr), 1, tasksCurrentPriority() + 10));
tasks.push_back(tasksRunAsync<CameraData>("render camera task", Delegate<void(CameraData &, uint32)>().bind<RenderPipelineImpl, &RenderPipelineImpl::taskCamera>(this), Holder<CameraData>(&data, nullptr)));
for (const auto &it : tasks)
it->wait();

Expand Down
Loading

0 comments on commit 586b6e7

Please sign in to comment.