From b6bccc510c3b98a42d21f3dc04048d8bea6486e0 Mon Sep 17 00:00:00 2001 From: markcty Date: Mon, 26 Dec 2022 17:14:18 +0800 Subject: [PATCH] cmd board gc, close #72 --- curp/Cargo.toml | 1 + curp/src/server/cmd_board.rs | 5 ++- curp/src/server/gc.rs | 74 ++++++++++++++++++++++++++++++++++-- curp/src/server/mod.rs | 4 +- 4 files changed, 77 insertions(+), 7 deletions(-) diff --git a/curp/Cargo.toml b/curp/Cargo.toml index 8f9a1b7ab..3b6639b7f 100644 --- a/curp/Cargo.toml +++ b/curp/Cargo.toml @@ -34,6 +34,7 @@ tonic = "0.7.2" tracing = { version = "0.1.34", features = ["std", "log", "attributes"] } tracing-opentelemetry = "0.18.0" flume = "0.10.14" +indexmap = "1.9.2" [dev-dependencies] itertools = "0.10.3" diff --git a/curp/src/server/cmd_board.rs b/curp/src/server/cmd_board.rs index 2888d0478..b433ed7ab 100644 --- a/curp/src/server/cmd_board.rs +++ b/curp/src/server/cmd_board.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use event_listener::Event; +use indexmap::IndexMap; use crate::{cmd::ProposeId, rpc::WaitSyncedResponse}; @@ -10,7 +11,7 @@ pub(super) struct CommandBoard { /// Stores all notifiers for wait_synced requests pub(super) notifiers: HashMap, /// Stores all command states - pub(super) cmd_states: HashMap, + pub(super) cmd_states: IndexMap, } impl CommandBoard { @@ -18,7 +19,7 @@ impl CommandBoard { pub(super) fn new() -> Self { Self { notifiers: HashMap::new(), - cmd_states: HashMap::new(), + cmd_states: IndexMap::new(), } } diff --git a/curp/src/server/gc.rs b/curp/src/server/gc.rs index 2def64929..be4d238d3 100644 --- a/curp/src/server/gc.rs +++ b/curp/src/server/gc.rs @@ -3,21 +3,28 @@ use std::{sync::Arc, time::Duration}; use parking_lot::Mutex; use tokio::time::Instant; -use crate::cmd::Command; - use super::spec_pool::SpeculativePool; +use crate::{cmd::Command, server::cmd_board::CommandBoard}; /// How often spec should GC const SPEC_GC_INTERVAL: Duration = Duration::from_secs(10); +/// How often cmd board should +const CMD_BOARD_GC_INTERVAL: Duration = Duration::from_secs(20); + /// Run background GC tasks for Curp server -pub(super) fn run_gc_tasks(spec: Arc>>) { +pub(super) fn run_gc_tasks( + spec: Arc>>, + cmd_board: Arc>, +) { let _spec_gc_handle = tokio::spawn(async move { loop { tokio::time::sleep(SPEC_GC_INTERVAL).await; spec.lock().gc(); } }); + + let _cmd_board_gc = tokio::spawn(gc_cmd_board(cmd_board, CMD_BOARD_GC_INTERVAL)); } impl SpeculativePool { @@ -27,3 +34,64 @@ impl SpeculativePool { self.ready.retain(|_, time| now - *time >= SPEC_GC_INTERVAL); } } + +/// Cleanup cmd board +async fn gc_cmd_board(cmd_board: Arc>, interval: Duration) { + let mut last_check_len = 0; + loop { + tokio::time::sleep(interval).await; + let mut board = cmd_board.lock(); + let new_board = board.cmd_states.split_off(last_check_len); + board.cmd_states = new_board; + last_check_len = board.cmd_states.len(); + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use parking_lot::Mutex; + + use crate::{ + cmd::ProposeId, + server::{ + cmd_board::{CmdState, CommandBoard}, + gc::gc_cmd_board, + }, + }; + + #[allow(unused_results, clippy::unwrap_used)] + #[tokio::test] + async fn cmd_board_gc_test() { + let board = Arc::new(Mutex::new(CommandBoard::new())); + tokio::spawn(gc_cmd_board(Arc::clone(&board), Duration::from_millis(500))); + + tokio::time::sleep(Duration::from_millis(100)).await; + board + .lock() + .cmd_states + .insert(ProposeId::new("1".to_owned()), CmdState::EarlyArrive); + tokio::time::sleep(Duration::from_millis(100)).await; + board + .lock() + .cmd_states + .insert(ProposeId::new("2".to_owned()), CmdState::EarlyArrive); + + // at 600ms + tokio::time::sleep(Duration::from_millis(400)).await; + board + .lock() + .cmd_states + .insert(ProposeId::new("3".to_owned()), CmdState::EarlyArrive); + + // at 1100ms, the first two kv should be removed + tokio::time::sleep(Duration::from_millis(500)).await; + let board = board.lock(); + assert_eq!(board.cmd_states.len(), 1); + assert_eq!( + board.cmd_states.get_index(0).unwrap().0, + &ProposeId::new("3".to_owned()) + ); + } +} diff --git a/curp/src/server/mod.rs b/curp/src/server/mod.rs index a63882166..417c1e27e 100644 --- a/curp/src/server/mod.rs +++ b/curp/src/server/mod.rs @@ -310,7 +310,7 @@ impl Protocol { Arc::new(AtomicBool::new(true)), )); - run_gc_tasks(Arc::clone(&spec)); + run_gc_tasks(Arc::clone(&spec), Arc::clone(&cmd_board)); Self { state, @@ -362,7 +362,7 @@ impl Protocol { reachable, )); - run_gc_tasks(Arc::clone(&spec)); + run_gc_tasks(Arc::clone(&spec), Arc::clone(&cmd_board)); Self { state,