Skip to content

Commit

Permalink
Merge pull request #1620 from Framstag/WorkerPool
Browse files Browse the repository at this point in the history
feat: ThreadedWorkerPool, issue #1035
  • Loading branch information
Framstag authored Nov 2, 2024
2 parents 0da3c57 + 484d141 commit 4db1a5d
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 122 deletions.
21 changes: 9 additions & 12 deletions Tests/src/WorkQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,14 @@ class Worker
osmscout::WorkQueue<int> queue;

private:
int Work(int a, int b)
{
std::cout << "Doing task #" << a << std::endl;

return a+b;
}

void TaskLoop()
{
std::packaged_task<int()> task;

std::cout << "Starting TaskLoop()..." << std::endl;

while (queue.PopTask(task)) {
task();
while (!queue.Finished()) {
if (auto optionalTask=queue.PopTask(); optionalTask) {
optionalTask.value()();
}
}

std::cout << "Quit TaskLoop()" << std::endl;
Expand All @@ -65,7 +58,11 @@ class Worker

std::future<int> PushWork(int a, int b)
{
std::packaged_task<int()> task(std::bind(&Worker::Work,this,a,b));
std::packaged_task<int()> task([a,b] {
std::cout << "Doing task #" << a << std::endl;

return a+b;
});

std::future<int> future=task.get_future();

Expand Down
48 changes: 17 additions & 31 deletions libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,10 @@ namespace osmscout {
{
progress.Info("Merging areas of type "+job.type->GetName());

AreaMergeResult result;
StopClock mergeStopClock;

result=MergeAreas(progress,
job);
AreaMergeResult result=MergeAreas(progress,
job);

mergeStopClock.Stop();
progress.Info(
Expand All @@ -350,18 +349,16 @@ namespace osmscout {

void ProcessingLoop() override
{
while (true) {
while (!inQueue.Finished()) {
std::optional<AreaMergeJob> value=inQueue.PopTask();

if (!value) {
break;
}
if (value) {
AreaMergeJob job=std::move(value.value());

AreaMergeJob job=std::move(value.value());
AreaMergeResult result=ProcessJob(job);

AreaMergeResult result=ProcessJob(job);

outQueue.PushTask(result);
outQueue.PushTask(result);
}
}

outQueue.Stop();
Expand Down Expand Up @@ -391,14 +388,10 @@ namespace osmscout {
std::vector<AreaMergeResult> mergeResult(typeConfig.GetTypeCount());
ProcessingQueue<AreaMergeJob> queue1(10000);
ProcessingQueue<AreaMergeResult> queue2(10000);
std::vector<std::shared_ptr<MergeWorker>> mergeWorkerPool;
std::vector<AreaMergeJob> mergeJobList;

for (size_t t=1; t<=std::thread::hardware_concurrency(); t++) {
mergeWorkerPool.push_back(std::make_shared<MergeWorker>(progress,
queue1,
queue2));
}
ThreadedWorkerPool<MergeWorker> workerPool(progress,
queue1,
queue2);

for (const auto& type : loadedTypes) {
if (!mergeJobs[type->GetIndex()].areas.empty()) {
Expand All @@ -407,26 +400,19 @@ namespace osmscout {
}

// Sort job list by area count (job with most areas first) to increase chance for usage of all threads
std::sort(mergeJobList.begin(),
mergeJobList.end(),
[](const AreaMergeJob& a,
const AreaMergeJob& b) {
return a.areas.size()>b.areas.size();
});
std::ranges::sort(mergeJobList,
[](const AreaMergeJob& a,
const AreaMergeJob& b) {
return a.areas.size()>b.areas.size();
});

// Push all jobs
for (auto& job : mergeJobList) {
queue1.PushTask(std::move(job));
}
queue1.Stop();

// Wait for all worker to finish
for (auto& worker : mergeWorkerPool) {
worker->Wait();
}

// Free workers
mergeWorkerPool.clear();
workerPool.Wait();

// Read worker results from queue until queue is empty
while (true) {
Expand Down
16 changes: 10 additions & 6 deletions libosmscout-import/src/osmscoutimport/Preprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,12 @@ namespace osmscout {

void Preprocess::Callback::BlockWorkerLoop()
{
std::packaged_task<ProcessedDataRef()> task;
while (!blockWorkerQueue.Finished()) {
auto optionalTask=blockWorkerQueue.PopTask();

while (blockWorkerQueue.PopTask(task)) {
task();
if (optionalTask) {
optionalTask.value()();
}
}
}

Expand Down Expand Up @@ -742,10 +744,12 @@ namespace osmscout {

void Preprocess::Callback::WriteWorkerLoop()
{
std::packaged_task<void()> task;
while (!writeWorkerQueue.Finished()) {
auto optionalTask=writeWorkerQueue.PopTask();

while (writeWorkerQueue.PopTask(task)) {
task();
if (optionalTask) {
optionalTask.value()();
}
}
}

Expand Down
36 changes: 12 additions & 24 deletions libosmscout-map/src/osmscoutmap/MapService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,65 +616,53 @@ namespace osmscout {
{
SetThreadName("NodeLoader");

std::packaged_task<bool()> task;

while (nodeWorkerQueue.PopTask(task)) {
task();
while (auto task=nodeWorkerQueue.PopTask()) {
task.value()();
}
}

void MapService::WayWorkerLoop()
{
SetThreadName("WayLoader");

std::packaged_task<bool()> task;

while (wayWorkerQueue.PopTask(task)) {
task();
while (auto task=wayWorkerQueue.PopTask()) {
task.value()();
}
}

void MapService::WayLowZoomWorkerLoop()
{
SetThreadName("WayLowZoomLoader");

std::packaged_task<bool()> task;

while (wayLowZoomWorkerQueue.PopTask(task)) {
task();
while (auto task=wayLowZoomWorkerQueue.PopTask()) {
task.value()();
}
}

void MapService::AreaWorkerLoop()
{
SetThreadName("AreaLoader");

std::packaged_task<bool()> task;

while (areaWorkerQueue.PopTask(task)) {
task();
while (auto task=areaWorkerQueue.PopTask()) {
task.value()();
}
}

void MapService::AreaLowZoomWorkerLoop()
{
SetThreadName("AreaLowZoomLoader");

std::packaged_task<bool()> task;

while (areaLowZoomWorkerQueue.PopTask(task)) {
task();
while (auto task=areaLowZoomWorkerQueue.PopTask()) {
task.value()();
}
}

void MapService::RouteWorkerLoop()
{
SetThreadName("RouteLoader");

std::packaged_task<bool()> task;

while (routeWorkerQueue.PopTask(task)) {
task();
while (auto task=routeWorkerQueue.PopTask()) {
task.value()();
}
}

Expand Down
20 changes: 17 additions & 3 deletions libosmscout/include/osmscout/async/ProcessingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ namespace osmscout {
std::optional<T> PopTask();

void Stop();

bool Finished();
};

/**
Expand Down Expand Up @@ -157,8 +159,7 @@ namespace osmscout {

popCondition.wait(lock,[this]{return !tasks.empty() || !running;});

if (!running &&
tasks.empty()) {
if (tasks.empty()) {
return std::nullopt;
}

Expand All @@ -177,7 +178,6 @@ namespace osmscout {
*
* @tparam R
*/

template<class R>
void ProcessingQueue<R>::Stop()
{
Expand All @@ -189,6 +189,20 @@ namespace osmscout {

popCondition.notify_all();
}

/**
* Return true, if the queue is stopped and empty, else false.
* @tparam R
* @return true, if stopped and empty, else false
*/
template<class R>
bool ProcessingQueue<R>::Finished()
{
std::unique_lock lock(mutex);

return !running && tasks.empty();
}

}

#endif
44 changes: 1 addition & 43 deletions libosmscout/include/osmscout/async/WorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,12 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/

#include <condition_variable>
#include <deque>
#include <future>
#include <memory>
#include <limits>
#include <thread>

#include <osmscout/lib/CoreImportExport.h>
#include <osmscout/async/ProcessingQueue.h>

namespace osmscout {

template<typename R>
class WorkQueue: public ProcessingQueue<std::packaged_task<R ()>>
{
private:
using Task = std::packaged_task<R ()>;

public:
WorkQueue() = default;

explicit WorkQueue(size_t queueLimit);
~WorkQueue() override = default;

bool PopTask(Task& task);
};


template<class R>
WorkQueue<R>::WorkQueue(size_t queueLimit)
: ProcessingQueue<Task>(queueLimit)
{
// no code
}

template<class R>
bool WorkQueue<R>::PopTask(Task& task)
{
auto taskOpt = ProcessingQueue<Task>::PopTask();

if (!taskOpt) {
return false;
}

task=std::move(taskOpt.value());

return true;
}
using WorkQueue = ProcessingQueue<std::packaged_task<R ()>>;
}

#endif
38 changes: 37 additions & 1 deletion libosmscout/include/osmscout/async/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/

#include <vector>
#include <thread>
#include <optional>

#include <osmscout/async/ProcessingQueue.h>

Expand Down Expand Up @@ -168,6 +168,42 @@ namespace osmscout {
}
};

template <typename W>
class ThreadedWorkerPool
{
private:
std::vector<std::shared_ptr<W>> workerPool;

public:
template <class ...Args>
explicit ThreadedWorkerPool(Args&&... args)
{
unsigned int size=std::max(1u,std::thread::hardware_concurrency());

for (unsigned int i = 0; i < size; i++) {
workerPool.push_back(std::make_shared<W>(std::forward<Args>(args)...));
}
}

template <class ...Args>
explicit ThreadedWorkerPool(size_t size, Args&&... args)
{
for (size_t i = 0; i < size; i++) {
workerPool.push_back(std::make_shared<W>(std::forward<Args>(args)...));
}
}

void Wait()
{
for (auto& worker : workerPool) {
worker->Wait();
}

workerPool.clear();
}
};


}

#endif
Loading

0 comments on commit 4db1a5d

Please sign in to comment.