diff --git a/openr/if/KvStore.thrift b/openr/if/KvStore.thrift index 3aa6d7eebff..2d8b0ff51e7 100644 --- a/openr/if/KvStore.thrift +++ b/openr/if/KvStore.thrift @@ -464,6 +464,8 @@ struct KvStoreConfig { 15: i32 sync_initial_backoff_ms = 4000; 16: i32 sync_max_backoff_ms = 256000; 17: optional i32 self_adjacency_timeout_ms; + /* configuration of timeout before kvstore considered sync even when no peers learned */ + 18: optional i32 kvstore_sync_timeout_ms; } /** diff --git a/openr/if/OpenrConfig.thrift b/openr/if/OpenrConfig.thrift index 5224f1d3295..5b514b378d0 100644 --- a/openr/if/OpenrConfig.thrift +++ b/openr/if/OpenrConfig.thrift @@ -74,6 +74,9 @@ struct KvstoreConfig { 8: i32 sync_initial_backoff_ms = 4000; 9: i32 sync_max_backoff_ms = 256000; 10: optional i32 self_adjacency_timeout_ms; + + /* configuration of timeout before kvstore considered sync even when no peers learned */ + 11: optional i32 kvstore_sync_timeout_ms; } /* diff --git a/openr/kvstore/KvStore-inl.h b/openr/kvstore/KvStore-inl.h index cf2f606e078..38dae1664b3 100644 --- a/openr/kvstore/KvStore-inl.h +++ b/openr/kvstore/KvStore-inl.h @@ -76,6 +76,19 @@ KvStore::KvStore( std::chrono::milliseconds(*kvStoreConfig.sync_max_backoff_ms()); } + if (kvStoreConfig.kvstore_sync_timeout_ms().has_value()) { + XLOG(INFO) << "Set kvStoreSyncTimer_ with timeout " + << *kvStoreConfig.kvstore_sync_timeout_ms(); + kvParams_.kvStoreSyncTimeout = + std::chrono::milliseconds(*kvStoreConfig.kvstore_sync_timeout_ms()); + + kvStoreSyncTimer_ = folly::AsyncTimeout::make(*getEvb(), [this]() noexcept { + XLOG(INFO) << "kvStoreSync Timer expired"; + initZeroPeersKvStores(); + }); + kvStoreSyncTimer_->scheduleTimeout(kvParams_.kvStoreSyncTimeout.count()); + } + if (kvStoreConfig.self_adjacency_timeout_ms().has_value()) { kvParams_.selfAdjSyncTimeout = std::chrono::milliseconds(*kvStoreConfig.self_adjacency_timeout_ms()); @@ -276,6 +289,31 @@ KvStore::getKvStoreUpdatesReader() { return kvParams_.kvStoreUpdatesQueue.getReader(); } +/* + * In OpenR initialization process, first PeerEvent publishment from + * LinkMonitor includes peers in all areas. However, KvStore could receive + * empty peers in one configured area in following scenarios, + * - The device is running in standalone mode, + * - The configured area just spawns without any peer yet. + * In order to make KvStore converge in initialization process, KvStoreDb + * with no peers in the area is treated as syncing completed. Otherwise, + * 'initialKvStoreDbSynced()' will not publish kvStoreSynced signal, and + * downstream modules cannot proceed to complete initialization. + */ +template +void +KvStore::initZeroPeersKvStores() { + for (auto& [area, kvStoreDb] : kvStoreDb_) { + if (kvStoreDb.getPeerCnt() != 0) { + continue; + } + + XLOG(INFO) + << fmt::format("[Initialization] Received 0 peers in area {}.", area); + kvStoreDb.processInitializationEvent(); + } +} + template void KvStore::processPeerUpdates(PeerEvent&& event) { @@ -283,30 +321,33 @@ KvStore::processPeerUpdates(PeerEvent&& event) { // Event can contain peerAdd/peerDel simultaneously if (not areaPeerEvent.peersToAdd.empty()) { semifuture_addUpdateKvStorePeers(area, areaPeerEvent.peersToAdd).get(); + if (isKvStoreSyncTimerScheduled()) { + kvStoreSyncTimer_->cancelTimeout(); + } } if (not areaPeerEvent.peersToDel.empty()) { semifuture_deleteKvStorePeers(area, areaPeerEvent.peersToDel).get(); + if (isKvStoreSyncTimerScheduled()) { + kvStoreSyncTimer_->cancelTimeout(); + } } } if ((not initialSyncSignalSent_)) { - // In OpenR initialization process, first PeerEvent publishment from - // LinkMonitor includes peers in all areas. However, KvStore could receive - // empty peers in one configured area in following scenarios, - // - The device is running in standalone mode, - // - The configured area just spawns without any peer yet. - // In order to make KvStore converge in initialization process, KvStoreDb - // with no peers in the area is treated as syncing completed. Otherwise, - // 'initialKvStoreDbSynced()' will not publish kvStoreSynced signal, and - // downstream modules cannot proceed to complete initialization. - for (auto& [area, kvStoreDb] : kvStoreDb_) { - if (kvStoreDb.getPeerCnt() != 0) { - continue; - } - XLOG(INFO) - << fmt::format("[Initialization] Received 0 peers in area {}.", area); - kvStoreDb.processInitializationEvent(); + /** + * This whole block is applicable to only case when node has not + * learned any peers yet + * + * Do not declare KVSTORE_SYNCED pre-maturely. In use-cases, it may + * take longer to learn peers during initialization. Allow them to + * not declare KVSTORE_SYNCED at-least until configurable timeout + * if no peers learned. + */ + if (isKvStoreSyncTimerScheduled()) { + return; } + + initZeroPeersKvStores(); } } diff --git a/openr/kvstore/KvStore.h b/openr/kvstore/KvStore.h index f15c051c877..36a4b5d790b 100644 --- a/openr/kvstore/KvStore.h +++ b/openr/kvstore/KvStore.h @@ -613,6 +613,18 @@ class KvStore final : public OpenrEventBase { return false; } + /* + * Check if kvStoreSync timer currently scheduled on or off + */ + inline bool + isKvStoreSyncTimerScheduled() { + if (kvStoreSyncTimer_ && kvStoreSyncTimer_->isScheduled()) { + return true; + } + + return false; + } + /* * [Open/R Initialization] * @@ -785,6 +797,11 @@ class KvStore final : public OpenrEventBase { std::map getGlobalCounters() const; void initGlobalCounters(); + /* + * Initialize all kvstore instances with zero peers + */ + void initZeroPeersKvStores(); + /* * This is a helper function which returns a reference to the relevant * KvStoreDb or throws an instance of KvStoreError for backward compaytibilty. @@ -814,6 +831,9 @@ class KvStore final : public OpenrEventBase { std::unique_ptr initialSelfOriginatedKeysTimer_{nullptr}; + // Timer to await for learning at least one peer + std::unique_ptr kvStoreSyncTimer_{nullptr}; + // kvstore parameters common to all kvstoreDB KvStoreParams kvParams_; diff --git a/openr/kvstore/KvStoreParams.h b/openr/kvstore/KvStoreParams.h index b02f56f48ab..6d260bd5ab0 100644 --- a/openr/kvstore/KvStoreParams.h +++ b/openr/kvstore/KvStoreParams.h @@ -44,6 +44,8 @@ struct KvStoreParams { // Locally adjacency learning timeout std::chrono::milliseconds selfAdjSyncTimeout; + std::chrono::milliseconds kvStoreSyncTimeout; + // TLS knob bool enable_secure_thrift_client{false}; // TLS paths diff --git a/openr/kvstore/tests/KvStoreTest.cpp b/openr/kvstore/tests/KvStoreTest.cpp index 3141b4ba49d..16f780d0587 100644 --- a/openr/kvstore/tests/KvStoreTest.cpp +++ b/openr/kvstore/tests/KvStoreTest.cpp @@ -881,6 +881,110 @@ TEST_F(KvStoreTestFixture, PeerResyncWithDefaultBackoff) { EXPECT_EQ(*cmpPeers["storeB"].state(), thrift::KvStorePeerState::INITIALIZED); } +/** + * Let KVSTORE_SYNCED signal learned as soon when there are no peers + * + * Create KvStore just for one node without any peers + * + * Then wait for KVSTORE_SYNCED signal and verify that sync signal was + * immediately + */ +TEST_F(KvStoreTestFixture, KvStoreSyncWithoutTimeout) { + messaging::ReplicateQueue myPeerUpdatesQueue; + + auto config = getTestKvConf("storeA"); + + auto const start = std::chrono::steady_clock::now(); + auto* storeA = + createKvStore(config, {kTestingAreaName}, myPeerUpdatesQueue.getReader()); + storeA->run(); + + // Publish empty peers. + myPeerUpdatesQueue.push(PeerEvent()); + + // Wait for KVSTORE_SYNCED signal + storeA->recvKvStoreSyncedSignal(); + + auto cmpPeers = storeA->getPeers(kTestingAreaName); + EXPECT_EQ(0, cmpPeers.size()); + auto elapsedTime = + duration_cast(steady_clock::now() - start).count(); + + // Should receive KVSTORE_SYNCED much before 1000 ms + EXPECT_LT(elapsedTime, 1000); +} + +/** + * Explicitly set kvstoreConfig for timeout to declare KVSTORE_SYNCED + * when there are no peers that can be learned (with empty PeerEvent) + * + * Create KvStore just for one node without any peers + * + * Then wait for KVSTORE_SYNCED signal and verify that sync signal was + * received only after the timeout value + */ +TEST_F(KvStoreTestFixture, KvStoreSyncTimeoutWithEmptyPeerUpdate) { + messaging::ReplicateQueue myPeerUpdatesQueue; + const std::chrono::milliseconds kKvStoreSyncTimeout(2000); + const std::chrono::milliseconds kKvStoreSyncTimeoutUpperCheck(2200); + + auto config = getTestKvConf("storeA"); + config.kvstore_sync_timeout_ms() = kKvStoreSyncTimeout.count(); + + auto const start = std::chrono::steady_clock::now(); + auto* storeA = + createKvStore(config, {kTestingAreaName}, myPeerUpdatesQueue.getReader()); + storeA->run(); + + // Publish empty peers. + myPeerUpdatesQueue.push(PeerEvent()); + + // Wait for KVSTORE_SYNCED signal + storeA->recvKvStoreSyncedSignal(); + + auto cmpPeers = storeA->getPeers(kTestingAreaName); + EXPECT_EQ(0, cmpPeers.size()); + + auto elapsedTime = + duration_cast(steady_clock::now() - start).count(); + EXPECT_GT(elapsedTime, kKvStoreSyncTimeout.count()); + EXPECT_LT(elapsedTime, kKvStoreSyncTimeoutUpperCheck.count()); +} + +/** + * Explicitly set kvstoreConfig for timeout to declare KVSTORE_SYNCED + * when there are no peers that can be learned + * + * Create KvStore just for one node without any peers + * + * Then wait for KVSTORE_SYNCED signal and verify that sync signal was + * received only after the timeout value + */ +TEST_F(KvStoreTestFixture, KvStoreSyncTimeoutWithoutPeerUpdate) { + messaging::ReplicateQueue myPeerUpdatesQueue; + const std::chrono::milliseconds kKvStoreSyncTimeout(2000); + const std::chrono::milliseconds kKvStoreSyncTimeoutUpperCheck(2200); + + auto config = getTestKvConf("storeA"); + config.kvstore_sync_timeout_ms() = kKvStoreSyncTimeout.count(); + + auto const start = std::chrono::steady_clock::now(); + auto* storeA = + createKvStore(config, {kTestingAreaName}, myPeerUpdatesQueue.getReader()); + storeA->run(); + + // Wait for KVSTORE_SYNCED signal + storeA->recvKvStoreSyncedSignal(); + + auto cmpPeers = storeA->getPeers(kTestingAreaName); + EXPECT_EQ(0, cmpPeers.size()); + + auto elapsedTime = + duration_cast(steady_clock::now() - start).count(); + EXPECT_GT(elapsedTime, kKvStoreSyncTimeout.count()); + EXPECT_LT(elapsedTime, kKvStoreSyncTimeoutUpperCheck.count()); +} + // When you receive a update from 'other' about a key you originates, // with some inconsistency (higher ttl_version) // 1. you should never delete it