Skip to content

Commit

Permalink
Fix for flaky LruClockTest. (#146)
Browse files Browse the repository at this point in the history
* Add diagnostic printing for sporadic test failure (LruClockTest.SyncUpdate128).

* Increase tolerances for LruClockTest.

* Fixes for flaky LruClockTest.

* Stop slow thread inside retry loop (LruClockTest).
  • Loading branch information
tonyastolfi authored Mar 18, 2024
1 parent 0ba86e1 commit 45a9138
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 32 deletions.
8 changes: 8 additions & 0 deletions src/llfs/lru_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ void LRUClock::add_local_counter(LocalCounter& counter) noexcept
{
std::unique_lock<std::mutex> lock{this->mutex_};

// Initialize the local counter to the max observed global value.
//
counter.value.store(this->observed_count_);

this->counter_list_.push_back(counter);
}

Expand All @@ -155,6 +159,10 @@ void LRUClock::remove_local_counter(LocalCounter& counter) noexcept
{
std::unique_lock<std::mutex> lock{this->mutex_};

// Update the global max observed count (last reading from this local counter).
//
this->observed_count_ = std::max(this->observed_count_, counter.value.load());

this->counter_list_.erase(this->counter_list_.iterator_to(counter));
}

Expand Down
5 changes: 4 additions & 1 deletion src/llfs/lru_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class LRUClock

~LocalCounter() noexcept;

/** \brief The next unused counter value for the current thread.
*/
std::atomic<i64> value{0};
};

Expand Down Expand Up @@ -131,7 +133,8 @@ class LRUClock
*/
void remove_local_counter(LocalCounter& counter) noexcept;

/** \brief Returns the maximum count value from the last time sync_local_counters() was called.
/** \brief Returns the maximum count value (least upper bound; i.e., the first unused value) from
* the last time sync_local_counters() was called.
*/
i64 read_observed_count() noexcept;

Expand Down
109 changes: 78 additions & 31 deletions src/llfs/lru_clock.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,27 @@ using namespace llfs::int_types;
TEST(LruClockTest, PerThreadUpdate)
{
const usize kNumThreads = std::thread::hardware_concurrency();
const usize kUpdatesPerThread = 1000;
const usize kUpdatesPerThread = 25 * 1000;

std::vector<std::vector<i64>> per_thread_values(kNumThreads, std::vector<i64>(kUpdatesPerThread));

std::atomic<bool> start{false};
std::vector<std::thread> threads;

for (usize i = 0; i < kNumThreads; ++i) {
threads.emplace_back([i, &per_thread_values] {
threads.emplace_back([i, &start, &per_thread_values] {
while (!start.load()) {
continue;
}
for (usize j = 0; j < kUpdatesPerThread; ++j) {
const i64 value = llfs::LRUClock::advance_local();
per_thread_values[i][j] = value;
}
});
}

start.store(true);

for (std::thread& t : threads) {
t.join();
}
Expand All @@ -72,14 +78,16 @@ TEST(LruClockTest, PerThreadUpdate)
}
}

usize repeated_values = 0;
for (const auto& [value, count] : count_per_value) {
if (count > 1) {
++repeated_values;
if (kNumThreads > 1) {
usize repeated_values = 0;
for (const auto& [value, count] : count_per_value) {
if (count > 1) {
++repeated_values;
}
}
}

EXPECT_GT(repeated_values, 0u);
EXPECT_GT(repeated_values, 0u);
}
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -88,17 +96,40 @@ TEST(LruClockTest, PerThreadUpdate)
void run_sync_update_test(const usize kNumFastThreads)
{
const usize kUpdatesPerThread = 50 * 1000 * 1000;
const usize kSlowThreadReads = 20;

std::vector<i64> slow_thread_values(kSlowThreadReads);
std::thread slow_thread{[&slow_thread_values] {
for (usize i = 0; i < kSlowThreadReads; ++i) {
std::this_thread::sleep_for(
std::chrono::microseconds(llfs::LRUClock::kMaxSyncDelayUsec * 40));
slow_thread_values[i] = llfs::LRUClock::read_local();

// The maximum number of consecutive attempts on the slow thread to read an increasing value.
//
const usize kSyncDelayToleranceFactor = 100;

// The "slot thread" sleeps for the maximum sync delay times the tolerance factor, then takes a
// reading from its local counter. All observed local counts are recorded for verification below.
//
std::atomic<bool> stop_slow_thread{false};
std::vector<i64> slow_thread_values;
std::thread slow_thread{[&slow_thread_values, &stop_slow_thread] {
while (!stop_slow_thread.load()) {
const i64 prev_value = slow_thread_values.empty() ? -1 : slow_thread_values.back();
slow_thread_values.emplace_back();

for (usize j = 0; j < kSyncDelayToleranceFactor; ++j) {
std::this_thread::sleep_for(std::chrono::microseconds(llfs::LRUClock::kMaxSyncDelayUsec));

// NOTE: we are only calling `read_local()` here, not `advance_local()`. That means unless
// the other (fast) threads are updating the counter via periodic global synchronization,
// this value will not change!
//
slow_thread_values.back() = llfs::LRUClock::read_local();
if (slow_thread_values.back() > prev_value || stop_slow_thread.load()) {
break;
}
}
}
}};

// The "fast threads" just advance their local counters as fast as possible. For each fast
// thread, we keep track of the maximum observed count value, which should eventually skip ahead
// due to global sync operations.
//
std::vector<std::thread> fast_threads;
std::vector<batt::CpuCacheLineIsolated<i64>> max_fast_thread_value(kNumFastThreads);

Expand All @@ -113,38 +144,54 @@ void run_sync_update_test(const usize kNumFastThreads)
});
}

// Wait for all threads to finish.
//
for (std::thread& t : fast_threads) {
t.join();
}

stop_slow_thread.store(true);
slow_thread.join();

// Calculate the maximum count value observed from any of the fast threads. Since all threads are
// known to have finished, this value will not change.
//
i64 max_count = 0;
for (batt::CpuCacheLineIsolated<i64>& count : max_fast_thread_value) {
max_count = std::max(max_count, *count);
}

const i64 max_synced_count = llfs::LRUClock::read_global();
EXPECT_EQ(max_synced_count, max_count + 1);

// Verify the required properties of the slow thread's count observations...
//
{
i64 prev_value = -1;

for (usize i = 0; i < slow_thread_values.size(); ++i) {
// Once we observe the maximum count, all future observations should be the same (since the
// slow thread does not do any updating on its own).
//
if (slow_thread_values[i] >= max_synced_count) {
for (; i < slow_thread_values.size(); ++i) {
EXPECT_EQ(slow_thread_values[i], max_synced_count);
}
break;
}

EXPECT_LE(max_synced_count, max_count);
// All other values should be strictly increasing.
//
EXPECT_GT(slow_thread_values[i], prev_value)
<< BATT_INSPECT(i) << BATT_INSPECT(slow_thread_values[i])
<< BATT_INSPECT(slow_thread_values[i - 1]) << BATT_INSPECT(max_synced_count)
<< BATT_INSPECT(max_count) << BATT_INSPECT_RANGE(slow_thread_values);

for (usize i = 1; i < kSlowThreadReads; ++i) {
if (slow_thread_values[i] >= max_synced_count) {
for (; i < kSlowThreadReads; ++i) {
EXPECT_EQ(slow_thread_values[i], max_synced_count);
}
break;
}
if (slow_thread_values[i] == 0 && slow_thread_values[i - 1] == 0) {
continue;
prev_value = slow_thread_values[i];
}
EXPECT_GT(slow_thread_values[i] - slow_thread_values[i - 1], 50)
<< BATT_INSPECT(i) << BATT_INSPECT(slow_thread_values[i])
<< BATT_INSPECT(slow_thread_values[i - 1]) << BATT_INSPECT(max_synced_count)
<< BATT_INSPECT(max_count) << BATT_INSPECT_RANGE(slow_thread_values);
}

EXPECT_GT(slow_thread_values.back(), kUpdatesPerThread / 10);
EXPECT_GT(slow_thread_values.back(), (kUpdatesPerThread * 95) / 100);
}

TEST(LruClockTest, SyncUpdate1)
Expand Down

0 comments on commit 45a9138

Please sign in to comment.