Skip to content

Commit

Permalink
cmd board gc, close #72
Browse files Browse the repository at this point in the history
  • Loading branch information
markcty authored and mergify[bot] committed Dec 27, 2022
1 parent c11c269 commit b6bccc5
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 7 deletions.
1 change: 1 addition & 0 deletions curp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tonic = "0.7.2"
tracing = { version = "0.1.34", features = ["std", "log", "attributes"] }
tracing-opentelemetry = "0.18.0"
flume = "0.10.14"
indexmap = "1.9.2"

[dev-dependencies]
itertools = "0.10.3"
Expand Down
5 changes: 3 additions & 2 deletions curp/src/server/cmd_board.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use event_listener::Event;
use indexmap::IndexMap;

use crate::{cmd::ProposeId, rpc::WaitSyncedResponse};

Expand All @@ -10,15 +11,15 @@ pub(super) struct CommandBoard {
/// Stores all notifiers for wait_synced requests
pub(super) notifiers: HashMap<ProposeId, Event>,
/// Stores all command states
pub(super) cmd_states: HashMap<ProposeId, CmdState>,
pub(super) cmd_states: IndexMap<ProposeId, CmdState>,
}

impl CommandBoard {
/// Create an empty command board
pub(super) fn new() -> Self {
Self {
notifiers: HashMap::new(),
cmd_states: HashMap::new(),
cmd_states: IndexMap::new(),
}
}

Expand Down
74 changes: 71 additions & 3 deletions curp/src/server/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,28 @@ use std::{sync::Arc, time::Duration};
use parking_lot::Mutex;
use tokio::time::Instant;

use crate::cmd::Command;

use super::spec_pool::SpeculativePool;
use crate::{cmd::Command, server::cmd_board::CommandBoard};

/// How often spec should GC
const SPEC_GC_INTERVAL: Duration = Duration::from_secs(10);

/// How often cmd board should
const CMD_BOARD_GC_INTERVAL: Duration = Duration::from_secs(20);

/// Run background GC tasks for Curp server
pub(super) fn run_gc_tasks<C: Command + 'static>(spec: Arc<Mutex<SpeculativePool<C>>>) {
pub(super) fn run_gc_tasks<C: Command + 'static>(
spec: Arc<Mutex<SpeculativePool<C>>>,
cmd_board: Arc<Mutex<CommandBoard>>,
) {
let _spec_gc_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(SPEC_GC_INTERVAL).await;
spec.lock().gc();
}
});

let _cmd_board_gc = tokio::spawn(gc_cmd_board(cmd_board, CMD_BOARD_GC_INTERVAL));
}

impl<C: Command + 'static> SpeculativePool<C> {
Expand All @@ -27,3 +34,64 @@ impl<C: Command + 'static> SpeculativePool<C> {
self.ready.retain(|_, time| now - *time >= SPEC_GC_INTERVAL);
}
}

/// Cleanup cmd board
async fn gc_cmd_board(cmd_board: Arc<Mutex<CommandBoard>>, interval: Duration) {
let mut last_check_len = 0;
loop {
tokio::time::sleep(interval).await;
let mut board = cmd_board.lock();
let new_board = board.cmd_states.split_off(last_check_len);
board.cmd_states = new_board;
last_check_len = board.cmd_states.len();
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

use parking_lot::Mutex;

use crate::{
cmd::ProposeId,
server::{
cmd_board::{CmdState, CommandBoard},
gc::gc_cmd_board,
},
};

#[allow(unused_results, clippy::unwrap_used)]
#[tokio::test]
async fn cmd_board_gc_test() {
let board = Arc::new(Mutex::new(CommandBoard::new()));
tokio::spawn(gc_cmd_board(Arc::clone(&board), Duration::from_millis(500)));

tokio::time::sleep(Duration::from_millis(100)).await;
board
.lock()
.cmd_states
.insert(ProposeId::new("1".to_owned()), CmdState::EarlyArrive);
tokio::time::sleep(Duration::from_millis(100)).await;
board
.lock()
.cmd_states
.insert(ProposeId::new("2".to_owned()), CmdState::EarlyArrive);

// at 600ms
tokio::time::sleep(Duration::from_millis(400)).await;
board
.lock()
.cmd_states
.insert(ProposeId::new("3".to_owned()), CmdState::EarlyArrive);

// at 1100ms, the first two kv should be removed
tokio::time::sleep(Duration::from_millis(500)).await;
let board = board.lock();
assert_eq!(board.cmd_states.len(), 1);
assert_eq!(
board.cmd_states.get_index(0).unwrap().0,
&ProposeId::new("3".to_owned())
);
}
}
4 changes: 2 additions & 2 deletions curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl<C: 'static + Command> Protocol<C> {
Arc::new(AtomicBool::new(true)),
));

run_gc_tasks(Arc::clone(&spec));
run_gc_tasks(Arc::clone(&spec), Arc::clone(&cmd_board));

Self {
state,
Expand Down Expand Up @@ -362,7 +362,7 @@ impl<C: 'static + Command> Protocol<C> {
reachable,
));

run_gc_tasks(Arc::clone(&spec));
run_gc_tasks(Arc::clone(&spec), Arc::clone(&cmd_board));

Self {
state,
Expand Down

0 comments on commit b6bccc5

Please sign in to comment.