From a923533280e286ea5570da4f1690006c083e70e4 Mon Sep 17 00:00:00 2001 From: Tim Teulings Date: Sat, 2 Nov 2024 19:50:58 +0100 Subject: [PATCH 1/2] feat: ThreadedWorkerPool, issue #1035 - Created ThreadedWorkerPool - Adapted GenMergeAreas::MergeAreas() --- .../src/osmscoutimport/GenMergeAreas.cpp | 34 ++++++----------- libosmscout/include/osmscout/async/Worker.h | 38 ++++++++++++++++++- 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp b/libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp index d4e27df17..44995b699 100644 --- a/libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp +++ b/libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp @@ -333,11 +333,10 @@ namespace osmscout { { progress.Info("Merging areas of type "+job.type->GetName()); - AreaMergeResult result; StopClock mergeStopClock; - result=MergeAreas(progress, - job); + AreaMergeResult result=MergeAreas(progress, + job); mergeStopClock.Stop(); progress.Info( @@ -391,14 +390,10 @@ namespace osmscout { std::vector mergeResult(typeConfig.GetTypeCount()); ProcessingQueue queue1(10000); ProcessingQueue queue2(10000); - std::vector> mergeWorkerPool; std::vector mergeJobList; - - for (size_t t=1; t<=std::thread::hardware_concurrency(); t++) { - mergeWorkerPool.push_back(std::make_shared(progress, - queue1, - queue2)); - } + ThreadedWorkerPool workerPool(progress, + queue1, + queue2); for (const auto& type : loadedTypes) { if (!mergeJobs[type->GetIndex()].areas.empty()) { @@ -407,12 +402,11 @@ namespace osmscout { } // Sort job list by area count (job with most areas first) to increase chance for usage of all threads - std::sort(mergeJobList.begin(), - mergeJobList.end(), - [](const AreaMergeJob& a, - const AreaMergeJob& b) { - return a.areas.size()>b.areas.size(); - }); + std::ranges::sort(mergeJobList, + [](const AreaMergeJob& a, + const AreaMergeJob& b) { + return a.areas.size()>b.areas.size(); + }); // Push all jobs for (auto& job : mergeJobList) { @@ -420,13 +414,7 @@ namespace osmscout { } queue1.Stop(); - // Wait for all worker to finish - for (auto& worker : mergeWorkerPool) { - worker->Wait(); - } - - // Free workers - mergeWorkerPool.clear(); + workerPool.Wait(); // Read worker results from queue until queue is empty while (true) { diff --git a/libosmscout/include/osmscout/async/Worker.h b/libosmscout/include/osmscout/async/Worker.h index bfc472e52..d79af9807 100644 --- a/libosmscout/include/osmscout/async/Worker.h +++ b/libosmscout/include/osmscout/async/Worker.h @@ -20,8 +20,8 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include #include -#include #include @@ -168,6 +168,42 @@ namespace osmscout { } }; + template + class ThreadedWorkerPool + { + private: + std::vector> workerPool; + + public: + template + explicit ThreadedWorkerPool(Args&&... args) + { + unsigned int size=std::thread::hardware_concurrency(); + + for (unsigned int i = 0; i < size; i++) { + workerPool.push_back(std::make_shared(std::forward(args)...)); + } + } + + template + explicit ThreadedWorkerPool(size_t size, Args&&... args) + { + for (size_t i = 0; i < size; i++) { + workerPool.push_back(std::make_shared(std::forward(args)...)); + } + } + + void Wait() + { + for (auto& worker : workerPool) { + worker->Wait(); + } + + workerPool.clear(); + } + }; + + } #endif From 484d141c0aae795c2d736f648c997b3f3ab657b7 Mon Sep 17 00:00:00 2001 From: Tim Teulings Date: Sat, 2 Nov 2024 23:26:37 +0100 Subject: [PATCH 2/2] feat: ThreadedWorkerPool, issue #1035 - Created ThreadedWorkerPool - Adapted GenMergeAreas::MergeAreas() - Some improvements to ProcessingQueue, avoiding race conditions - Cleanup of WorkQueue, which is now just a ProcessingQueue with special template parameter know --- Tests/src/WorkQueueTest.cpp | 21 ++++----- .../src/osmscoutimport/GenMergeAreas.cpp | 14 +++--- .../src/osmscoutimport/Preprocess.cpp | 16 ++++--- .../src/osmscoutmap/MapService.cpp | 36 +++++---------- .../include/osmscout/async/ProcessingQueue.h | 20 +++++++-- .../include/osmscout/async/WorkQueue.h | 44 +------------------ libosmscout/include/osmscout/async/Worker.h | 2 +- .../src/osmscout/async/AsyncWorker.cpp | 4 +- 8 files changed, 58 insertions(+), 99 deletions(-) diff --git a/Tests/src/WorkQueueTest.cpp b/Tests/src/WorkQueueTest.cpp index e5411699e..6caedec2a 100644 --- a/Tests/src/WorkQueueTest.cpp +++ b/Tests/src/WorkQueueTest.cpp @@ -31,21 +31,14 @@ class Worker osmscout::WorkQueue queue; private: - int Work(int a, int b) - { - std::cout << "Doing task #" << a << std::endl; - - return a+b; - } - void TaskLoop() { - std::packaged_task task; - std::cout << "Starting TaskLoop()..." << std::endl; - while (queue.PopTask(task)) { - task(); + while (!queue.Finished()) { + if (auto optionalTask=queue.PopTask(); optionalTask) { + optionalTask.value()(); + } } std::cout << "Quit TaskLoop()" << std::endl; @@ -65,7 +58,11 @@ class Worker std::future PushWork(int a, int b) { - std::packaged_task task(std::bind(&Worker::Work,this,a,b)); + std::packaged_task task([a,b] { + std::cout << "Doing task #" << a << std::endl; + + return a+b; + }); std::future future=task.get_future(); diff --git a/libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp b/libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp index 44995b699..16e45f9d2 100644 --- a/libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp +++ b/libosmscout-import/src/osmscoutimport/GenMergeAreas.cpp @@ -349,18 +349,16 @@ namespace osmscout { void ProcessingLoop() override { - while (true) { + while (!inQueue.Finished()) { std::optional value=inQueue.PopTask(); - if (!value) { - break; - } + if (value) { + AreaMergeJob job=std::move(value.value()); - AreaMergeJob job=std::move(value.value()); + AreaMergeResult result=ProcessJob(job); - AreaMergeResult result=ProcessJob(job); - - outQueue.PushTask(result); + outQueue.PushTask(result); + } } outQueue.Stop(); diff --git a/libosmscout-import/src/osmscoutimport/Preprocess.cpp b/libosmscout-import/src/osmscoutimport/Preprocess.cpp index 744890e87..6b9c06153 100644 --- a/libosmscout-import/src/osmscoutimport/Preprocess.cpp +++ b/libosmscout-import/src/osmscoutimport/Preprocess.cpp @@ -657,10 +657,12 @@ namespace osmscout { void Preprocess::Callback::BlockWorkerLoop() { - std::packaged_task task; + while (!blockWorkerQueue.Finished()) { + auto optionalTask=blockWorkerQueue.PopTask(); - while (blockWorkerQueue.PopTask(task)) { - task(); + if (optionalTask) { + optionalTask.value()(); + } } } @@ -742,10 +744,12 @@ namespace osmscout { void Preprocess::Callback::WriteWorkerLoop() { - std::packaged_task task; + while (!writeWorkerQueue.Finished()) { + auto optionalTask=writeWorkerQueue.PopTask(); - while (writeWorkerQueue.PopTask(task)) { - task(); + if (optionalTask) { + optionalTask.value()(); + } } } diff --git a/libosmscout-map/src/osmscoutmap/MapService.cpp b/libosmscout-map/src/osmscoutmap/MapService.cpp index 55ead3dbb..aa98c19ff 100644 --- a/libosmscout-map/src/osmscoutmap/MapService.cpp +++ b/libosmscout-map/src/osmscoutmap/MapService.cpp @@ -616,10 +616,8 @@ namespace osmscout { { SetThreadName("NodeLoader"); - std::packaged_task task; - - while (nodeWorkerQueue.PopTask(task)) { - task(); + while (auto task=nodeWorkerQueue.PopTask()) { + task.value()(); } } @@ -627,10 +625,8 @@ namespace osmscout { { SetThreadName("WayLoader"); - std::packaged_task task; - - while (wayWorkerQueue.PopTask(task)) { - task(); + while (auto task=wayWorkerQueue.PopTask()) { + task.value()(); } } @@ -638,10 +634,8 @@ namespace osmscout { { SetThreadName("WayLowZoomLoader"); - std::packaged_task task; - - while (wayLowZoomWorkerQueue.PopTask(task)) { - task(); + while (auto task=wayLowZoomWorkerQueue.PopTask()) { + task.value()(); } } @@ -649,10 +643,8 @@ namespace osmscout { { SetThreadName("AreaLoader"); - std::packaged_task task; - - while (areaWorkerQueue.PopTask(task)) { - task(); + while (auto task=areaWorkerQueue.PopTask()) { + task.value()(); } } @@ -660,10 +652,8 @@ namespace osmscout { { SetThreadName("AreaLowZoomLoader"); - std::packaged_task task; - - while (areaLowZoomWorkerQueue.PopTask(task)) { - task(); + while (auto task=areaLowZoomWorkerQueue.PopTask()) { + task.value()(); } } @@ -671,10 +661,8 @@ namespace osmscout { { SetThreadName("RouteLoader"); - std::packaged_task task; - - while (routeWorkerQueue.PopTask(task)) { - task(); + while (auto task=routeWorkerQueue.PopTask()) { + task.value()(); } } diff --git a/libosmscout/include/osmscout/async/ProcessingQueue.h b/libosmscout/include/osmscout/async/ProcessingQueue.h index 8e159ec9a..1588d4b41 100644 --- a/libosmscout/include/osmscout/async/ProcessingQueue.h +++ b/libosmscout/include/osmscout/async/ProcessingQueue.h @@ -76,6 +76,8 @@ namespace osmscout { std::optional PopTask(); void Stop(); + + bool Finished(); }; /** @@ -157,8 +159,7 @@ namespace osmscout { popCondition.wait(lock,[this]{return !tasks.empty() || !running;}); - if (!running && - tasks.empty()) { + if (tasks.empty()) { return std::nullopt; } @@ -177,7 +178,6 @@ namespace osmscout { * * @tparam R */ - template void ProcessingQueue::Stop() { @@ -189,6 +189,20 @@ namespace osmscout { popCondition.notify_all(); } + + /** + * Return true, if the queue is stopped and empty, else false. + * @tparam R + * @return true, if stopped and empty, else false + */ + template + bool ProcessingQueue::Finished() + { + std::unique_lock lock(mutex); + + return !running && tasks.empty(); + } + } #endif diff --git a/libosmscout/include/osmscout/async/WorkQueue.h b/libosmscout/include/osmscout/async/WorkQueue.h index e02102e61..157009ff7 100644 --- a/libosmscout/include/osmscout/async/WorkQueue.h +++ b/libosmscout/include/osmscout/async/WorkQueue.h @@ -20,54 +20,12 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include -#include #include -#include -#include -#include - -#include #include namespace osmscout { - template - class WorkQueue: public ProcessingQueue> - { - private: - using Task = std::packaged_task; - - public: - WorkQueue() = default; - - explicit WorkQueue(size_t queueLimit); - ~WorkQueue() override = default; - - bool PopTask(Task& task); - }; - - - template - WorkQueue::WorkQueue(size_t queueLimit) - : ProcessingQueue(queueLimit) - { - // no code - } - - template - bool WorkQueue::PopTask(Task& task) - { - auto taskOpt = ProcessingQueue::PopTask(); - - if (!taskOpt) { - return false; - } - - task=std::move(taskOpt.value()); - - return true; - } + using WorkQueue = ProcessingQueue>; } #endif diff --git a/libosmscout/include/osmscout/async/Worker.h b/libosmscout/include/osmscout/async/Worker.h index d79af9807..091a473fd 100644 --- a/libosmscout/include/osmscout/async/Worker.h +++ b/libosmscout/include/osmscout/async/Worker.h @@ -178,7 +178,7 @@ namespace osmscout { template explicit ThreadedWorkerPool(Args&&... args) { - unsigned int size=std::thread::hardware_concurrency(); + unsigned int size=std::max(1u,std::thread::hardware_concurrency()); for (unsigned int i = 0; i < size; i++) { workerPool.push_back(std::make_shared(std::forward(args)...)); diff --git a/libosmscout/src/osmscout/async/AsyncWorker.cpp b/libosmscout/src/osmscout/async/AsyncWorker.cpp index d0d4c4221..ddde8e2be 100644 --- a/libosmscout/src/osmscout/async/AsyncWorker.cpp +++ b/libosmscout/src/osmscout/async/AsyncWorker.cpp @@ -44,10 +44,10 @@ AsyncWorker::~AsyncWorker() void AsyncWorker::Loop() { - while (true) { + while (!queue.Finished()) { auto taskOpt = queue.PopTask(); if (!taskOpt) { - break; + continue; } taskOpt.value()();