From 45a9138f44f0cef0b71ea351dd82b5c1112e2b26 Mon Sep 17 00:00:00 2001 From: Tony Astolfi Date: Mon, 18 Mar 2024 13:44:44 -0400 Subject: [PATCH] Fix for flaky LruClockTest. (#146) * Add diagnostic printing for sporadic test failure (LruClockTest.SyncUpdate128). * Increase tolerances for LruClockTest. * Fixes for flaky LruClockTest. * Stop slow thread inside retry loop (LruClockTest). --- src/llfs/lru_clock.cpp | 8 +++ src/llfs/lru_clock.hpp | 5 +- src/llfs/lru_clock.test.cpp | 109 ++++++++++++++++++++++++++---------- 3 files changed, 90 insertions(+), 32 deletions(-) diff --git a/src/llfs/lru_clock.cpp b/src/llfs/lru_clock.cpp index a5e1d5d..930824f 100644 --- a/src/llfs/lru_clock.cpp +++ b/src/llfs/lru_clock.cpp @@ -146,6 +146,10 @@ void LRUClock::add_local_counter(LocalCounter& counter) noexcept { std::unique_lock lock{this->mutex_}; + // Initialize the local counter to the max observed global value. + // + counter.value.store(this->observed_count_); + this->counter_list_.push_back(counter); } @@ -155,6 +159,10 @@ void LRUClock::remove_local_counter(LocalCounter& counter) noexcept { std::unique_lock lock{this->mutex_}; + // Update the global max observed count (last reading from this local counter). + // + this->observed_count_ = std::max(this->observed_count_, counter.value.load()); + this->counter_list_.erase(this->counter_list_.iterator_to(counter)); } diff --git a/src/llfs/lru_clock.hpp b/src/llfs/lru_clock.hpp index 748bfea..eff53b2 100644 --- a/src/llfs/lru_clock.hpp +++ b/src/llfs/lru_clock.hpp @@ -72,6 +72,8 @@ class LRUClock ~LocalCounter() noexcept; + /** \brief The next unused counter value for the current thread. + */ std::atomic value{0}; }; @@ -131,7 +133,8 @@ class LRUClock */ void remove_local_counter(LocalCounter& counter) noexcept; - /** \brief Returns the maximum count value from the last time sync_local_counters() was called. + /** \brief Returns the maximum count value (least upper bound; i.e., the first unused value) from + * the last time sync_local_counters() was called. */ i64 read_observed_count() noexcept; diff --git a/src/llfs/lru_clock.test.cpp b/src/llfs/lru_clock.test.cpp index f0f8e9e..bcfca73 100644 --- a/src/llfs/lru_clock.test.cpp +++ b/src/llfs/lru_clock.test.cpp @@ -42,14 +42,18 @@ using namespace llfs::int_types; TEST(LruClockTest, PerThreadUpdate) { const usize kNumThreads = std::thread::hardware_concurrency(); - const usize kUpdatesPerThread = 1000; + const usize kUpdatesPerThread = 25 * 1000; std::vector> per_thread_values(kNumThreads, std::vector(kUpdatesPerThread)); + std::atomic start{false}; std::vector threads; for (usize i = 0; i < kNumThreads; ++i) { - threads.emplace_back([i, &per_thread_values] { + threads.emplace_back([i, &start, &per_thread_values] { + while (!start.load()) { + continue; + } for (usize j = 0; j < kUpdatesPerThread; ++j) { const i64 value = llfs::LRUClock::advance_local(); per_thread_values[i][j] = value; @@ -57,6 +61,8 @@ TEST(LruClockTest, PerThreadUpdate) }); } + start.store(true); + for (std::thread& t : threads) { t.join(); } @@ -72,14 +78,16 @@ TEST(LruClockTest, PerThreadUpdate) } } - usize repeated_values = 0; - for (const auto& [value, count] : count_per_value) { - if (count > 1) { - ++repeated_values; + if (kNumThreads > 1) { + usize repeated_values = 0; + for (const auto& [value, count] : count_per_value) { + if (count > 1) { + ++repeated_values; + } } - } - EXPECT_GT(repeated_values, 0u); + EXPECT_GT(repeated_values, 0u); + } } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -88,17 +96,40 @@ TEST(LruClockTest, PerThreadUpdate) void run_sync_update_test(const usize kNumFastThreads) { const usize kUpdatesPerThread = 50 * 1000 * 1000; - const usize kSlowThreadReads = 20; - - std::vector slow_thread_values(kSlowThreadReads); - std::thread slow_thread{[&slow_thread_values] { - for (usize i = 0; i < kSlowThreadReads; ++i) { - std::this_thread::sleep_for( - std::chrono::microseconds(llfs::LRUClock::kMaxSyncDelayUsec * 40)); - slow_thread_values[i] = llfs::LRUClock::read_local(); + + // The maximum number of consecutive attempts on the slow thread to read an increasing value. + // + const usize kSyncDelayToleranceFactor = 100; + + // The "slot thread" sleeps for the maximum sync delay times the tolerance factor, then takes a + // reading from its local counter. All observed local counts are recorded for verification below. + // + std::atomic stop_slow_thread{false}; + std::vector slow_thread_values; + std::thread slow_thread{[&slow_thread_values, &stop_slow_thread] { + while (!stop_slow_thread.load()) { + const i64 prev_value = slow_thread_values.empty() ? -1 : slow_thread_values.back(); + slow_thread_values.emplace_back(); + + for (usize j = 0; j < kSyncDelayToleranceFactor; ++j) { + std::this_thread::sleep_for(std::chrono::microseconds(llfs::LRUClock::kMaxSyncDelayUsec)); + + // NOTE: we are only calling `read_local()` here, not `advance_local()`. That means unless + // the other (fast) threads are updating the counter via periodic global synchronization, + // this value will not change! + // + slow_thread_values.back() = llfs::LRUClock::read_local(); + if (slow_thread_values.back() > prev_value || stop_slow_thread.load()) { + break; + } + } } }}; + // The "fast threads" just advance their local counters as fast as possible. For each fast + // thread, we keep track of the maximum observed count value, which should eventually skip ahead + // due to global sync operations. + // std::vector fast_threads; std::vector> max_fast_thread_value(kNumFastThreads); @@ -113,38 +144,54 @@ void run_sync_update_test(const usize kNumFastThreads) }); } + // Wait for all threads to finish. + // for (std::thread& t : fast_threads) { t.join(); } + stop_slow_thread.store(true); slow_thread.join(); + // Calculate the maximum count value observed from any of the fast threads. Since all threads are + // known to have finished, this value will not change. + // i64 max_count = 0; for (batt::CpuCacheLineIsolated& count : max_fast_thread_value) { max_count = std::max(max_count, *count); } const i64 max_synced_count = llfs::LRUClock::read_global(); + EXPECT_EQ(max_synced_count, max_count + 1); + + // Verify the required properties of the slow thread's count observations... + // + { + i64 prev_value = -1; + + for (usize i = 0; i < slow_thread_values.size(); ++i) { + // Once we observe the maximum count, all future observations should be the same (since the + // slow thread does not do any updating on its own). + // + if (slow_thread_values[i] >= max_synced_count) { + for (; i < slow_thread_values.size(); ++i) { + EXPECT_EQ(slow_thread_values[i], max_synced_count); + } + break; + } - EXPECT_LE(max_synced_count, max_count); + // All other values should be strictly increasing. + // + EXPECT_GT(slow_thread_values[i], prev_value) + << BATT_INSPECT(i) << BATT_INSPECT(slow_thread_values[i]) + << BATT_INSPECT(slow_thread_values[i - 1]) << BATT_INSPECT(max_synced_count) + << BATT_INSPECT(max_count) << BATT_INSPECT_RANGE(slow_thread_values); - for (usize i = 1; i < kSlowThreadReads; ++i) { - if (slow_thread_values[i] >= max_synced_count) { - for (; i < kSlowThreadReads; ++i) { - EXPECT_EQ(slow_thread_values[i], max_synced_count); - } - break; - } - if (slow_thread_values[i] == 0 && slow_thread_values[i - 1] == 0) { - continue; + prev_value = slow_thread_values[i]; } - EXPECT_GT(slow_thread_values[i] - slow_thread_values[i - 1], 50) - << BATT_INSPECT(i) << BATT_INSPECT(slow_thread_values[i]) - << BATT_INSPECT(slow_thread_values[i - 1]) << BATT_INSPECT(max_synced_count) - << BATT_INSPECT(max_count) << BATT_INSPECT_RANGE(slow_thread_values); } - EXPECT_GT(slow_thread_values.back(), kUpdatesPerThread / 10); + EXPECT_GT(slow_thread_values.back(), (kUpdatesPerThread * 95) / 100); } TEST(LruClockTest, SyncUpdate1)