From 2ae9b4d4ac75ff06274f0af39f7b24722754bdb7 Mon Sep 17 00:00:00 2001 From: janbar Date: Tue, 30 Jan 2024 19:13:37 +0100 Subject: [PATCH] add testing for latch (readwritelock) It includes testing for meson build and cmake build. --- Tests/CMakeLists.txt | 3 + Tests/meson.build | 10 + Tests/src/Latch.cpp | 584 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 597 insertions(+) create mode 100644 Tests/src/Latch.cpp diff --git a/Tests/CMakeLists.txt b/Tests/CMakeLists.txt index f29af58e5..041f0ead0 100644 --- a/Tests/CMakeLists.txt +++ b/Tests/CMakeLists.txt @@ -34,6 +34,9 @@ osmscout_test_project(NAME ColorParse SOURCES src/ColorParse.cpp) #---- CoordinateEncoding osmscout_test_project(NAME CoordinateEncoding SOURCES src/CoordinateEncoding.cpp COMMAND "${CMAKE_CURRENT_SOURCE_DIR}/data/testregion") +#---- Latch +osmscout_test_project(NAME Latch SOURCES src/Latch.cpp) + #---- LocationLookup osmscout_test_project(NAME LocationLookupTest SOURCES src/LocationServiceTest.cpp src/SearchForLocationByStringTest.cpp src/SearchForLocationByFormTest.cpp src/SearchForPOIByFormTest.cpp TARGET OSMScout::Test OSMScout::Import) set_source_files_properties(src/SearchForLocationByStringTest.cpp src/SearchForLocationByFormTest.cpp src/SearchForPOIByFormTest.cpp src/LocationServiceTest.cpp PROPERTIES SKIP_UNITY_BUILD_INCLUSION TRUE) diff --git a/Tests/meson.build b/Tests/meson.build index c08de8f43..2fd7f5752 100644 --- a/Tests/meson.build +++ b/Tests/meson.build @@ -225,6 +225,16 @@ HeaderCheck = executable('HeaderCheck', test('Check use of \'<\'...\'>\' for includes', HeaderCheck, env: headerCheckEnv) +Latch = executable('Latch', + 'src/Latch.cpp', + include_directories: [testIncDir, osmscoutIncDir], + dependencies: [mathDep, threadDep, openmpDep], + link_with: [osmscout], + install: true, + install_dir: testInstallDir) + +test('Check latch consistency', Latch) + if buildImport LocationServiceTest = executable('LocationServiceTest', [ diff --git a/Tests/src/Latch.cpp b/Tests/src/Latch.cpp new file mode 100644 index 000000000..ac2d54a7b --- /dev/null +++ b/Tests/src/Latch.cpp @@ -0,0 +1,584 @@ +/* + Latch - a test program for libosmscout + Copyright (C) 2024 Jean-Luc Barriere + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + asize_t with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +static size_t iterationCount=250; +static auto taskDuration=1ms; + +static size_t refCounter = 0; +static osmscout::Latch latch; + +class ReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::ReadLock locker(latch); + [[maybe_unused]] size_t c = refCounter; + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit ReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&ReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class WriterWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::WriteLock locker(latch); + ++refCounter; + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit WriterWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&WriterWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class ReaderReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::ReadLock rl1(latch); + { + osmscout::ReadLock rl2(latch); + { + osmscout::ReadLock rl3(latch); + [[maybe_unused]] size_t c = refCounter; + std::this_thread::sleep_for(taskDuration); + } + } + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit ReaderReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&ReaderReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +class WriterReaderWorker +{ +private: + osmscout::ProcessingQueue& queue; + std::thread thread; + +public: + size_t processedCount; + +private: + void ProcessorLoop() + { + std::cout << "Processing..." << std::endl; + + while (true) { + std::optional value=queue.PopTask(); + + if (!value) { + std::cout << "Queue empty!" << std::endl; + break; + } + + if (value<0) { + std::cout << "Stop signal fetched!" << std::endl; + break; + } + + { + osmscout::WriteLock wr1(latch); + { + osmscout::WriteLock wr2(latch); + { + osmscout::ReadLock rl1(latch); + { + osmscout::ReadLock rl2(latch); + [[maybe_unused]] size_t c = refCounter; + } + } + ++refCounter; + } + std::this_thread::sleep_for(taskDuration); + } + + processedCount++; + } + + std::cout << "Processing...done" << std::endl; + } + +public: + explicit WriterReaderWorker(osmscout::ProcessingQueue& queue) + : queue(queue), + thread(&WriterReaderWorker::ProcessorLoop,this), + processedCount(0) + { + } + + void Wait() { + thread.join(); + } +}; + +TEST_CASE("Multi Reader Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + ReaderWorker worker1(queue); + ReaderWorker worker2(queue); + ReaderWorker worker3(queue); + ReaderWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReaderWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Writer Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterWorker worker1(queue); + WriterWorker worker2(queue); + WriterWorker worker3(queue); + WriterWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); + REQUIRE(refCounter == pc); +} + +TEST_CASE("Multi Reader Worker One Writer worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterWorker worker1(queue); + ReaderWorker worker2(queue); + ReaderWorker worker3(queue); + ReaderWorker worker4(queue); + + refCounter = 0; // reset counter + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReaderOneWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Recursive Reader Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + ReaderReaderWorker worker1(queue); + ReaderReaderWorker worker2(queue); + ReaderReaderWorker worker3(queue); + ReaderReaderWorker worker4(queue); + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReentrantReaderWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); +} + +TEST_CASE("Multi Recursive Writer Worker") { + osmscout::StopClock stopClock; + osmscout::ProcessingQueue queue(1000); + WriterReaderWorker worker1(queue); + WriterReaderWorker worker2(queue); + WriterReaderWorker worker3(queue); + WriterReaderWorker worker4(queue); + + refCounter = 0; // reset counter + + std::cout << "Pushing tasks..." << std::endl; + + for (size_t i=1; i<=iterationCount; i++) { + queue.PushTask(i); + } + + queue.PushTask(-1); + + queue.Stop(); + + worker1.Wait(); + worker2.Wait(); + worker3.Wait(); + worker4.Wait(); + + std::cout << "Pushing tasks...done, waiting..." << std::endl; + + + stopClock.Stop(); + + size_t pc = worker1.processedCount+worker2.processedCount+worker3.processedCount+ + worker4.processedCount; + + std::cout << "#processed: (" << refCounter << ") " + << pc << " [" + << worker1.processedCount << "," + << worker2.processedCount << "," + << worker3.processedCount << "," + << worker4.processedCount << "]" + << std::endl; + std::cout << "<<< MultiReentrantWriterWorker...done: " << stopClock.ResultString() << std::endl; + std::cout << std::endl; + + REQUIRE(pc == iterationCount); + REQUIRE(refCounter == pc); +} + +TEST_CASE("Check write precedence") { + volatile int i=0; + osmscout::ReadLock rl(latch); + + std::thread t([&i](){ + osmscout::WriteLock wl(latch); + i++; + }); + + // wait until writer lock is requested + while (true) { + if (!latch.try_lock_shared()) { + // writer lock is requested already + break; + } + latch.unlock_shared(); + std::this_thread::yield(); + } + + REQUIRE(i == 0); + rl.unlock(); + + osmscout::ReadLock rl2(latch); // we should not get shared lock until writer is done + REQUIRE(i == 1); + t.join(); +} + +TEST_CASE("Second shared lock should be blocked when exclusive is requested") { + int nbreader=4; + volatile int i=0; + std::atomic j=0; + std::atomic blocked=0; + std::atomic notblocked=0; + std::vector pools; + { + osmscout::ReadLock rl(latch); + + std::thread t([&i](){ + osmscout::WriteLock wl(latch); + i++; + }); + + // wait until writer lock is requested + while (true) { + if (!latch.try_lock_shared()) { + // writer lock is requested already + break; + } + latch.unlock_shared(); + std::this_thread::yield(); + } + + for (int nr=0; nr < nbreader; ++nr) { + std::thread * tr = new std::thread([&j, &blocked, ¬blocked](){ + if (latch.try_lock_shared()) { + notblocked++; + latch.unlock_shared(); + } else { + blocked++; + osmscout::ReadLock rl(latch); + j++; + } + }); + pools.push_back(tr); + } + + // wait for everyone to get set up + int k=0; + while ((notblocked.load() + blocked.load()) < 1 && k++ < 1000) { + std::this_thread::yield(); + } + REQUIRE(i == 0); // write lock is still waiting + REQUIRE((blocked.load() + notblocked.load()) > 0); + rl.unlock(); + t.join(); + } + std::cout << "#blocked: " << blocked.load() << "/" << nbreader << std::endl; + // hoping that 1 read lock have been blocked because exclusive lock was requested + REQUIRE((blocked.load() > 0)); + // check BUG: thread was not awakened after broadcast signal + // wait for all readers, or fail when lost reader + int k=0; + while (j.load() != blocked.load() && k++ < 1000) { + std::this_thread::yield(); + } + // all blocked readers must be finalized + REQUIRE(j.load() == blocked.load()); + // cleanup + while (!pools.empty()) { + pools.back()->join(); + delete pools.back(); + pools.pop_back(); + } +} +