Skip to content

Commit

Permalink
Implement configurable timeout before concluding KVSTORE_SYNCED event…
Browse files Browse the repository at this point in the history
… in case of 0 peers

Summary:
Today when there are no neigbors discovered, kvstore concludes event of KVSTORE_SYNCED with 0 peers learned.

In cases, eg. in EBB, neighbor discovery may take much longer because interfaces take longer time to come up. May want to delay setting KVSTORE_SYNCED with 0 peers until configurable timeout.

Reviewed By: xiangxu1121

Differential Revision: D64811583

fbshipit-source-id: f944eef546917ddb292e5f2e54521843a52ea2f8
  • Loading branch information
Shitanshu Shah authored and facebook-github-bot committed Oct 25, 2024
1 parent c778d40 commit aa6292a
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 16 deletions.
2 changes: 2 additions & 0 deletions openr/if/KvStore.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ struct KvStoreConfig {
15: i32 sync_initial_backoff_ms = 4000;
16: i32 sync_max_backoff_ms = 256000;
17: optional i32 self_adjacency_timeout_ms;
/* configuration of timeout before kvstore considered sync even when no peers learned */
18: optional i32 kvstore_sync_timeout_ms;
}

/**
Expand Down
3 changes: 3 additions & 0 deletions openr/if/OpenrConfig.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ struct KvstoreConfig {
8: i32 sync_initial_backoff_ms = 4000;
9: i32 sync_max_backoff_ms = 256000;
10: optional i32 self_adjacency_timeout_ms;

/* configuration of timeout before kvstore considered sync even when no peers learned */
11: optional i32 kvstore_sync_timeout_ms;
}

/*
Expand Down
73 changes: 57 additions & 16 deletions openr/kvstore/KvStore-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ KvStore<ClientType>::KvStore(
std::chrono::milliseconds(*kvStoreConfig.sync_max_backoff_ms());
}

if (kvStoreConfig.kvstore_sync_timeout_ms().has_value()) {
XLOG(INFO) << "Set kvStoreSyncTimer_ with timeout "
<< *kvStoreConfig.kvstore_sync_timeout_ms();
kvParams_.kvStoreSyncTimeout =
std::chrono::milliseconds(*kvStoreConfig.kvstore_sync_timeout_ms());

kvStoreSyncTimer_ = folly::AsyncTimeout::make(*getEvb(), [this]() noexcept {
XLOG(INFO) << "kvStoreSync Timer expired";
initZeroPeersKvStores();
});
kvStoreSyncTimer_->scheduleTimeout(kvParams_.kvStoreSyncTimeout.count());
}

if (kvStoreConfig.self_adjacency_timeout_ms().has_value()) {
kvParams_.selfAdjSyncTimeout =
std::chrono::milliseconds(*kvStoreConfig.self_adjacency_timeout_ms());
Expand Down Expand Up @@ -276,37 +289,65 @@ KvStore<ClientType>::getKvStoreUpdatesReader() {
return kvParams_.kvStoreUpdatesQueue.getReader();
}

/*
* In OpenR initialization process, first PeerEvent publishment from
* LinkMonitor includes peers in all areas. However, KvStore could receive
* empty peers in one configured area in following scenarios,
* - The device is running in standalone mode,
* - The configured area just spawns without any peer yet.
* In order to make KvStore converge in initialization process, KvStoreDb
* with no peers in the area is treated as syncing completed. Otherwise,
* 'initialKvStoreDbSynced()' will not publish kvStoreSynced signal, and
* downstream modules cannot proceed to complete initialization.
*/
template <class ClientType>
void
KvStore<ClientType>::initZeroPeersKvStores() {
for (auto& [area, kvStoreDb] : kvStoreDb_) {
if (kvStoreDb.getPeerCnt() != 0) {
continue;
}

XLOG(INFO)
<< fmt::format("[Initialization] Received 0 peers in area {}.", area);
kvStoreDb.processInitializationEvent();
}
}

template <class ClientType>
void
KvStore<ClientType>::processPeerUpdates(PeerEvent&& event) {
for (const auto& [area, areaPeerEvent] : event) {
// Event can contain peerAdd/peerDel simultaneously
if (not areaPeerEvent.peersToAdd.empty()) {
semifuture_addUpdateKvStorePeers(area, areaPeerEvent.peersToAdd).get();
if (isKvStoreSyncTimerScheduled()) {
kvStoreSyncTimer_->cancelTimeout();
}
}
if (not areaPeerEvent.peersToDel.empty()) {
semifuture_deleteKvStorePeers(area, areaPeerEvent.peersToDel).get();
if (isKvStoreSyncTimerScheduled()) {
kvStoreSyncTimer_->cancelTimeout();
}
}
}

if ((not initialSyncSignalSent_)) {
// In OpenR initialization process, first PeerEvent publishment from
// LinkMonitor includes peers in all areas. However, KvStore could receive
// empty peers in one configured area in following scenarios,
// - The device is running in standalone mode,
// - The configured area just spawns without any peer yet.
// In order to make KvStore converge in initialization process, KvStoreDb
// with no peers in the area is treated as syncing completed. Otherwise,
// 'initialKvStoreDbSynced()' will not publish kvStoreSynced signal, and
// downstream modules cannot proceed to complete initialization.
for (auto& [area, kvStoreDb] : kvStoreDb_) {
if (kvStoreDb.getPeerCnt() != 0) {
continue;
}
XLOG(INFO)
<< fmt::format("[Initialization] Received 0 peers in area {}.", area);
kvStoreDb.processInitializationEvent();
/**
* This whole block is applicable to only case when node has not
* learned any peers yet
*
* Do not declare KVSTORE_SYNCED pre-maturely. In use-cases, it may
* take longer to learn peers during initialization. Allow them to
* not declare KVSTORE_SYNCED at-least until configurable timeout
* if no peers learned.
*/
if (isKvStoreSyncTimerScheduled()) {
return;
}

initZeroPeersKvStores();
}
}

Expand Down
20 changes: 20 additions & 0 deletions openr/kvstore/KvStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,18 @@ class KvStore final : public OpenrEventBase {
return false;
}

/*
* Check if kvStoreSync timer currently scheduled on or off
*/
inline bool
isKvStoreSyncTimerScheduled() {
if (kvStoreSyncTimer_ && kvStoreSyncTimer_->isScheduled()) {
return true;
}

return false;
}

/*
* [Open/R Initialization]
*
Expand Down Expand Up @@ -785,6 +797,11 @@ class KvStore final : public OpenrEventBase {
std::map<std::string, int64_t> getGlobalCounters() const;
void initGlobalCounters();

/*
* Initialize all kvstore instances with zero peers
*/
void initZeroPeersKvStores();

/*
* This is a helper function which returns a reference to the relevant
* KvStoreDb or throws an instance of KvStoreError for backward compaytibilty.
Expand Down Expand Up @@ -814,6 +831,9 @@ class KvStore final : public OpenrEventBase {

std::unique_ptr<folly::AsyncTimeout> initialSelfOriginatedKeysTimer_{nullptr};

// Timer to await for learning at least one peer
std::unique_ptr<folly::AsyncTimeout> kvStoreSyncTimer_{nullptr};

// kvstore parameters common to all kvstoreDB
KvStoreParams kvParams_;

Expand Down
2 changes: 2 additions & 0 deletions openr/kvstore/KvStoreParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ struct KvStoreParams {
// Locally adjacency learning timeout
std::chrono::milliseconds selfAdjSyncTimeout;

std::chrono::milliseconds kvStoreSyncTimeout;

// TLS knob
bool enable_secure_thrift_client{false};
// TLS paths
Expand Down
104 changes: 104 additions & 0 deletions openr/kvstore/tests/KvStoreTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,110 @@ TEST_F(KvStoreTestFixture, PeerResyncWithDefaultBackoff) {
EXPECT_EQ(*cmpPeers["storeB"].state(), thrift::KvStorePeerState::INITIALIZED);
}

/**
* Let KVSTORE_SYNCED signal learned as soon when there are no peers
*
* Create KvStore just for one node without any peers
*
* Then wait for KVSTORE_SYNCED signal and verify that sync signal was
* immediately
*/
TEST_F(KvStoreTestFixture, KvStoreSyncWithoutTimeout) {
messaging::ReplicateQueue<PeerEvent> myPeerUpdatesQueue;

auto config = getTestKvConf("storeA");

auto const start = std::chrono::steady_clock::now();
auto* storeA =
createKvStore(config, {kTestingAreaName}, myPeerUpdatesQueue.getReader());
storeA->run();

// Publish empty peers.
myPeerUpdatesQueue.push(PeerEvent());

// Wait for KVSTORE_SYNCED signal
storeA->recvKvStoreSyncedSignal();

auto cmpPeers = storeA->getPeers(kTestingAreaName);
EXPECT_EQ(0, cmpPeers.size());
auto elapsedTime =
duration_cast<milliseconds>(steady_clock::now() - start).count();

// Should receive KVSTORE_SYNCED much before 1000 ms
EXPECT_LT(elapsedTime, 1000);
}

/**
* Explicitly set kvstoreConfig for timeout to declare KVSTORE_SYNCED
* when there are no peers that can be learned (with empty PeerEvent)
*
* Create KvStore just for one node without any peers
*
* Then wait for KVSTORE_SYNCED signal and verify that sync signal was
* received only after the timeout value
*/
TEST_F(KvStoreTestFixture, KvStoreSyncTimeoutWithEmptyPeerUpdate) {
messaging::ReplicateQueue<PeerEvent> myPeerUpdatesQueue;
const std::chrono::milliseconds kKvStoreSyncTimeout(2000);
const std::chrono::milliseconds kKvStoreSyncTimeoutUpperCheck(2200);

auto config = getTestKvConf("storeA");
config.kvstore_sync_timeout_ms() = kKvStoreSyncTimeout.count();

auto const start = std::chrono::steady_clock::now();
auto* storeA =
createKvStore(config, {kTestingAreaName}, myPeerUpdatesQueue.getReader());
storeA->run();

// Publish empty peers.
myPeerUpdatesQueue.push(PeerEvent());

// Wait for KVSTORE_SYNCED signal
storeA->recvKvStoreSyncedSignal();

auto cmpPeers = storeA->getPeers(kTestingAreaName);
EXPECT_EQ(0, cmpPeers.size());

auto elapsedTime =
duration_cast<milliseconds>(steady_clock::now() - start).count();
EXPECT_GT(elapsedTime, kKvStoreSyncTimeout.count());
EXPECT_LT(elapsedTime, kKvStoreSyncTimeoutUpperCheck.count());
}

/**
* Explicitly set kvstoreConfig for timeout to declare KVSTORE_SYNCED
* when there are no peers that can be learned
*
* Create KvStore just for one node without any peers
*
* Then wait for KVSTORE_SYNCED signal and verify that sync signal was
* received only after the timeout value
*/
TEST_F(KvStoreTestFixture, KvStoreSyncTimeoutWithoutPeerUpdate) {
messaging::ReplicateQueue<PeerEvent> myPeerUpdatesQueue;
const std::chrono::milliseconds kKvStoreSyncTimeout(2000);
const std::chrono::milliseconds kKvStoreSyncTimeoutUpperCheck(2200);

auto config = getTestKvConf("storeA");
config.kvstore_sync_timeout_ms() = kKvStoreSyncTimeout.count();

auto const start = std::chrono::steady_clock::now();
auto* storeA =
createKvStore(config, {kTestingAreaName}, myPeerUpdatesQueue.getReader());
storeA->run();

// Wait for KVSTORE_SYNCED signal
storeA->recvKvStoreSyncedSignal();

auto cmpPeers = storeA->getPeers(kTestingAreaName);
EXPECT_EQ(0, cmpPeers.size());

auto elapsedTime =
duration_cast<milliseconds>(steady_clock::now() - start).count();
EXPECT_GT(elapsedTime, kKvStoreSyncTimeout.count());
EXPECT_LT(elapsedTime, kKvStoreSyncTimeoutUpperCheck.count());
}

// When you receive a update from 'other' about a key you originates,
// with some inconsistency (higher ttl_version)
// 1. you should never delete it
Expand Down

0 comments on commit aa6292a

Please sign in to comment.