From 51a0ee5cf60d9d64223f755b421ffaeb7b8bb288 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 11 Mar 2024 11:14:46 +0800 Subject: [PATCH 01/13] allow raft apply committed logs before they are persisted Signed-off-by: glorv --- src/config.rs | 5 +++ src/raft.rs | 5 +++ src/raft_log.rs | 112 +++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 117 insertions(+), 5 deletions(-) diff --git a/src/config.rs b/src/config.rs index eed62efa..b03da44d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -98,6 +98,10 @@ pub struct Config { /// Max size for committed entries in a `Ready`. pub max_committed_size_per_ready: u64, + + /// Maximum raft log number that can be applied after commit but before persist. + /// The default value is 0, which means apply after both commit and persist. + pub applied_unpersisted_log_limit: u64, } impl Default for Config { @@ -120,6 +124,7 @@ impl Default for Config { priority: 0, max_uncommitted_size: NO_LIMIT, max_committed_size_per_ready: NO_LIMIT, + applied_unpersisted_log_limit: 0, } } } diff --git a/src/raft.rs b/src/raft.rs index 44bea42a..58650c44 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -366,6 +366,7 @@ impl Raft { max_committed_size_per_ready: c.max_committed_size_per_ready, }, }; + r.raft_log.apply_unpersisted_log_limit = c.apply_unpersisted_log_limit; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; let new_cs = r.post_conf_change(); if !raft_proto::conf_state_eq(&new_cs, conf_state) { @@ -599,6 +600,10 @@ impl Raft { pub fn set_check_quorum(&mut self, check_quorum: bool) { self.check_quorum = check_quorum; } + + pub fn set_applied_unpersisted_log_limit(&mut self, limit: u64) { + self.raft_log.apply_unpersisted_log_limit = limit; + } } impl RaftCore { diff --git a/src/raft_log.rs b/src/raft_log.rs index 2024ba60..5dcaa398 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -48,14 +48,18 @@ pub struct RaftLog { /// storage. It's used for limiting the upper bound of committed and /// persisted entries. /// - /// Invariant: persisted < unstable.offset && applied <= persisted + /// Invariant: persisted < unstable.offset pub persisted: u64, /// The highest log position that the application has been instructed /// to apply to its state machine. /// - /// Invariant: applied <= min(committed, persisted) + /// Invariant: applied <= min(committed, persisted + `apply_unpersisted_log_limit`) pub applied: u64, + + /// the maximum log gap between persisted_index and applied_index. + /// Caller should ensure the value won't lead to the upper bound overflow. + pub apply_unpersisted_log_limit: u64, } impl ToString for RaftLog @@ -87,6 +91,7 @@ impl RaftLog { persisted: last_index, applied: first_index - 1, unstable: Unstable::new(last_index + 1, logger), + apply_unpersisted_log_limit: 0, } } @@ -310,7 +315,11 @@ impl RaftLog { if idx == 0 { return; } - if idx > cmp::min(self.committed, self.persisted) || idx < self.applied { + // Do not check idx with committed or persisted index here becase when `apply_unpersisted_log_limit` > 0: + // 1. then it is possible idx > persisted. + // 2. when the application restart after applied but before committed entried(and committed index) is persisted + // then it is also possible idx > committed. + if idx < self.applied { fatal!( self.unstable.logger, "applied({}) is out of range [prev_applied({}), min(committed({}), persisted({}))]", @@ -422,7 +431,7 @@ impl RaftLog { /// Returns committed and persisted entries since max(`since_idx` + 1, first_index). pub fn next_entries_since(&self, since_idx: u64, max_size: Option) -> Option> { let offset = cmp::max(since_idx + 1, self.first_index()); - let high = cmp::min(self.committed, self.persisted) + 1; + let high = self.next_entries_upper_bound(); if high > offset { match self.slice( offset, @@ -437,6 +446,14 @@ impl RaftLog { None } + #[inline] + fn next_entries_upper_bound(&self) -> u64 { + std::cmp::min( + self.committed, + self.persisted + self.apply_unpersisted_log_limit, + ) + 1 + } + /// Returns all the available entries for execution. /// If applied is smaller than the index of snapshot, it returns all committed /// entries after the index of snapshot. @@ -448,7 +465,7 @@ impl RaftLog { /// max(`since_idx` + 1, first_index). pub fn has_next_entries_since(&self, since_idx: u64) -> bool { let offset = cmp::max(since_idx + 1, self.first_index()); - let high = cmp::min(self.committed, self.persisted) + 1; + let high = self.next_entries_upper_bound(); high > offset } @@ -1185,6 +1202,91 @@ mod test { ); } } + + let ents = [ + new_entry(4, 1), + new_entry(5, 1), + new_entry(6, 1), + new_entry(7, 1), + new_entry(8, 1), + new_entry(9, 1), + new_entry(10, 1), + ]; + const MAX: u64 = u32::MAX as u64; + let tests = vec![ + (0, 3, 3, 0, None), + (0, 3, 4, 0, None), + (0, 3, 4, MAX, Some(&ents[..1])), + (0, 4, 6, 0, Some(&ents[..1])), + (0, 4, 6, 2, Some(&ents[..3])), + (0, 4, 6, 6, Some(&ents[..3])), + (0, 4, 10, 0, Some(&ents[..1])), + (0, 4, 10, 2, Some(&ents[..3])), + (0, 4, 10, 6, Some(&ents)), + (0, 4, 10, 7, Some(&ents)), + (0, 6, 4, 0, Some(&ents[..1])), + (0, 6, 4, MAX, Some(&ents[..1])), + (0, 5, 5, 0, Some(&ents[..2])), + (3, 4, 3, MAX, None), + (3, 5, 5, MAX, Some(&ents[..2])), + (3, 6, 7, MAX, Some(&ents[..4])), + (3, 7, 6, MAX, Some(&ents[..3])), + (4, 5, 5, MAX, Some(&ents[1..2])), + (4, 5, 5, MAX, Some(&ents[1..2])), + (4, 5, 7, MAX, Some(&ents[1..4])), + (4, 5, 9, MAX, Some(&ents[1..6])), + (4, 5, 10, MAX, Some(&ents[1..])), + (4, 7, 5, MAX, Some(&ents[1..2])), + (4, 7, 7, 0, Some(&ents[1..4])), + (5, 5, 5, 0, None), + (5, 7, 7, MAX, Some(&ents[2..4])), + (7, 7, 7, MAX, None), + ]; + for (i, &(applied, persisted, committed, limit, ref expect_entries)) in + tests.iter().enumerate() + { + let store = MemStorage::new(); + store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); + let mut raft_log = RaftLog::new(store, l.clone()); + raft_log.apply_unpersisted_log_limit = limit; + raft_log.append(&ents); + let unstable = raft_log.unstable_entries().to_vec(); + if let Some(e) = unstable.last() { + raft_log.stable_entries(e.get_index(), e.get_term()); + raft_log.mut_store().wl().append(&unstable).expect(""); + } + raft_log.maybe_persist(persisted, 1); + assert_eq!( + persisted, raft_log.persisted, + "#{}: persisted = {}, want {}", + i, raft_log.persisted, persisted + ); + raft_log.maybe_commit(committed, 1); + assert_eq!( + committed, raft_log.committed, + "#{}: committed = {}, want {}", + i, raft_log.committed, committed + ); + #[allow(deprecated)] + raft_log.applied_to(applied); + + let expect_has_next = expect_entries.is_some(); + let actual_has_next = raft_log.has_next_entries(); + if actual_has_next != expect_has_next { + panic!( + "#{}: hasNext = {}, want {}", + i, actual_has_next, expect_has_next + ); + } + + let next_entries = raft_log.next_entries(None); + if next_entries != expect_entries.map(|n| n.to_vec()) { + panic!( + "#{}: next_entries = {:?}, want {:?}", + i, next_entries, expect_entries + ); + } + } } #[test] From 700838b287570837a958de681f2571da310cb570 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 11 Mar 2024 13:03:35 +0800 Subject: [PATCH 02/13] fix Signed-off-by: glorv --- src/config.rs | 4 ++-- src/raft.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/config.rs b/src/config.rs index b03da44d..3788ee69 100644 --- a/src/config.rs +++ b/src/config.rs @@ -101,7 +101,7 @@ pub struct Config { /// Maximum raft log number that can be applied after commit but before persist. /// The default value is 0, which means apply after both commit and persist. - pub applied_unpersisted_log_limit: u64, + pub apply_unpersisted_log_limit: u64, } impl Default for Config { @@ -124,7 +124,7 @@ impl Default for Config { priority: 0, max_uncommitted_size: NO_LIMIT, max_committed_size_per_ready: NO_LIMIT, - applied_unpersisted_log_limit: 0, + apply_unpersisted_log_limit: 0, } } } diff --git a/src/raft.rs b/src/raft.rs index 58650c44..9619db15 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -601,7 +601,8 @@ impl Raft { self.check_quorum = check_quorum; } - pub fn set_applied_unpersisted_log_limit(&mut self, limit: u64) { + /// Set the limit that applied index can be ahead of persisted index. + pub fn set_apply_unpersisted_log_limit(&mut self, limit: u64) { self.raft_log.apply_unpersisted_log_limit = limit; } } From 06ad2e75cd86506b50f1a57fd997dc0652de4d9d Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 11 Mar 2024 13:28:02 +0800 Subject: [PATCH 03/13] fix lint Signed-off-by: glorv --- src/lib.rs | 2 ++ src/raft.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 131d716d..49444b56 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -479,6 +479,8 @@ before taking old, removed peers offline. #![deny(clippy::all)] #![deny(missing_docs)] #![recursion_limit = "128"] +// TODO: remove this when we update the mininum rust compatible version. +#![allow(unused_imports)] // This is necessary to support prost and rust-protobuf at the same time. #![allow(clippy::useless_conversion)] // This lint recommends some bad choices sometimes. diff --git a/src/raft.rs b/src/raft.rs index 9619db15..05a2a978 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -285,6 +285,7 @@ impl DerefMut for Raft { } } +#[allow(dead_code)] trait AssertSend: Send {} impl AssertSend for Raft {} From 3cfd24e73193f98222f972165cd4df8c985443b8 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 11 Mar 2024 13:33:58 +0800 Subject: [PATCH 04/13] fix clippy for test Signed-off-by: glorv --- harness/tests/integration_cases/test_raft.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 180cc5e8..4dbf4235 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -5301,7 +5301,7 @@ fn test_election_with_priority_log() { (false, false, true, 1, 1, 3, 1, StateRole::Leader), ]; - for (_i, &(l1, l2, l3, p1, p2, p3, id, state)) in tests.iter().enumerate() { + for (l1, l2, l3, p1, p2, p3, id, state) in tests { let l = default_logger(); let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l); From 62777d09746a712ce6611b1e1d6ef86579f947c8 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 11 Mar 2024 13:59:56 +0800 Subject: [PATCH 05/13] fix lint Signed-off-by: glorv --- examples/single_mem_node/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/single_mem_node/main.rs b/examples/single_mem_node/main.rs index 8ef7c578..c60637f4 100644 --- a/examples/single_mem_node/main.rs +++ b/examples/single_mem_node/main.rs @@ -6,7 +6,6 @@ use std::sync::mpsc::{self, RecvTimeoutError}; use std::thread; use std::time::{Duration, Instant}; -use raft::eraftpb::ConfState; use raft::prelude::*; use raft::storage::MemStorage; From 1fd38fe61394e641c210283324b593f0e934f11e Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 11 Mar 2024 14:05:14 +0800 Subject: [PATCH 06/13] fix lint Signed-off-by: glorv --- harness/tests/integration_cases/test_raft.rs | 2 +- harness/tests/integration_cases/test_raw_node.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 4dbf4235..dbaed8bc 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -21,7 +21,7 @@ use std::panic::{self, AssertUnwindSafe}; use harness::*; use protobuf::Message as PbMessage; use raft::eraftpb::*; -use raft::storage::{GetEntriesContext, MemStorage}; +use raft::storage::MemStorage; use raft::*; use raft_proto::*; use slog::Logger; diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index a310bd2a..5cea3f55 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -17,7 +17,7 @@ use harness::Network; use protobuf::{Message as PbMessage, ProtobufEnum as _}; use raft::eraftpb::*; -use raft::storage::{GetEntriesContext, MemStorage}; +use raft::storage::MemStorage; use raft::*; use raft_proto::*; use slog::Logger; From 4a7ee4c664053d6823381f057bdd7e92740fbe3c Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 14 Mar 2024 14:07:47 +0800 Subject: [PATCH 07/13] resolve comment Signed-off-by: glorv --- src/raft.rs | 3 +-- src/raft_log.rs | 47 ++++++++++++++++++++++++++--------------------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 05a2a978..6fcd4b61 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -334,7 +334,7 @@ impl Raft { r: RaftCore { id: c.id, read_states: Default::default(), - raft_log: RaftLog::new(store, logger.clone()), + raft_log: RaftLog::new(store, logger.clone(), c), max_inflight: c.max_inflight_msgs, max_msg_size: c.max_size_per_msg, pending_request_snapshot: INVALID_INDEX, @@ -367,7 +367,6 @@ impl Raft { max_committed_size_per_ready: c.max_committed_size_per_ready, }, }; - r.raft_log.apply_unpersisted_log_limit = c.apply_unpersisted_log_limit; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; let new_cs = r.post_conf_change(); if !raft_proto::conf_state_eq(&new_cs, conf_state) { diff --git a/src/raft_log.rs b/src/raft_log.rs index 5dcaa398..41fffc1c 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -19,6 +19,7 @@ use std::cmp; use slog::warn; use slog::Logger; +use crate::config::Config; use crate::eraftpb::{Entry, Snapshot}; use crate::errors::{Error, Result, StorageError}; use crate::log_unstable::Unstable; @@ -80,7 +81,7 @@ where impl RaftLog { /// Creates a new raft log with a given storage and tag. - pub fn new(store: T, logger: Logger) -> RaftLog { + pub fn new(store: T, logger: Logger, cfg: &Config) -> RaftLog { let first_index = store.first_index().unwrap(); let last_index = store.last_index().unwrap(); @@ -91,7 +92,7 @@ impl RaftLog { persisted: last_index, applied: first_index - 1, unstable: Unstable::new(last_index + 1, logger), - apply_unpersisted_log_limit: 0, + apply_unpersisted_log_limit: cfg.apply_unpersisted_log_limit, } } @@ -719,6 +720,7 @@ mod test { panic::{self, AssertUnwindSafe}, }; + use crate::config::Config; use crate::default_logger; use crate::eraftpb; use crate::errors::{Error, StorageError}; @@ -792,7 +794,7 @@ mod test { ]; for (i, &(ref ents, wconflict)) in tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); let gconflict = raft_log.find_conflict(ents); if gconflict != wconflict { @@ -805,7 +807,7 @@ mod test { fn test_is_up_to_date() { let previous_ents = vec![new_entry(1, 1), new_entry(2, 2), new_entry(3, 3)]; let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.append(&previous_ents); let tests = vec![ // greater term, ignore lastIndex @@ -854,7 +856,7 @@ mod test { for (i, &(ref ents, windex, ref wents, wunstable)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().append(&previous_ents).expect("append failed"); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); let index = raft_log.append(ents); if index != windex { panic!("#{}: last_index = {}, want {}", i, index, windex); @@ -884,7 +886,7 @@ mod test { .append(&[new_entry(i, i)]) .expect("append failed"); } - let mut raft_log = RaftLog::new(storage, default_logger()); + let mut raft_log = RaftLog::new(storage, default_logger(), &Config::default()); for i in unstable_index..last_index { raft_log.append(&[new_entry(i + 1, i + 1)]); } @@ -931,7 +933,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(storagesnapi, 1)) .expect("apply failed."); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.restore(new_snapshot(unstablesnapi, 1)); assert_eq!(raft_log.committed, unstablesnapi); assert_eq!(raft_log.persisted, storagesnapi); @@ -964,7 +966,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(offset, 1)) .expect("apply failed."); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in 1..num { raft_log.append(&[new_entry(offset + i, i)]); } @@ -995,7 +997,7 @@ mod test { .expect("apply failed."); let entries = vec![new_entry(index + 1, term), new_entry(index + 2, term + 1)]; store.wl().append(&entries).expect(""); - let raft_log = RaftLog::new(store, default_logger()); + let raft_log = RaftLog::new(store, default_logger(), &Config::default()); assert_eq!(raft_log.all_entries(), entries); assert_eq!(index + 1, raft_log.first_index()); @@ -1064,7 +1066,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(snap_index, snap_term)) .expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); assert_eq!(raft_log.persisted, snap_index); raft_log.append(new_ents); let unstable = raft_log.unstable_entries().to_vec(); @@ -1082,7 +1084,7 @@ mod test { } } - let mut raft_log = RaftLog::new(MemStorage::new(), default_logger()); + let mut raft_log = RaftLog::new(MemStorage::new(), default_logger(), &Config::default()); raft_log.restore(new_snapshot(100, 1)); assert_eq!(raft_log.unstable.offset, 101); raft_log.append(&[new_entry(101, 1)]); @@ -1112,7 +1114,7 @@ mod test { .expect(""); // append unstable entries to raftlog - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents[(unstable - 1)..]); let ents = raft_log.unstable_entries().to_vec(); @@ -1163,7 +1165,7 @@ mod test { for (i, &(applied, persisted, committed, ref expect_entries)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&ents); let unstable = raft_log.unstable_entries().to_vec(); if let Some(e) = unstable.last() { @@ -1241,13 +1243,16 @@ mod test { (5, 5, 5, 0, None), (5, 7, 7, MAX, Some(&ents[2..4])), (7, 7, 7, MAX, None), + // applied can be higher than commited/persisted after restart. + (15, 9, 10, 0, None), + (15, 10, 10, MAX, None), ]; for (i, &(applied, persisted, committed, limit, ref expect_entries)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.apply_unpersisted_log_limit = limit; raft_log.append(&ents); let unstable = raft_log.unstable_entries().to_vec(); @@ -1308,7 +1313,7 @@ mod test { .append(&[new_entry(offset + i, offset + i)]) .expect(""); } - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in (num / 2)..num { raft_log.append(&[new_entry(offset + i, offset + i)]); } @@ -1447,7 +1452,7 @@ mod test { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(offset, 0)).unwrap(); store.wl().append(&entries(offset + 1, half)).unwrap(); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.append(&entries(half, last)); // Test that scan() returns the same entries as slice(), on all inputs. @@ -1696,7 +1701,7 @@ mod test { tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); raft_log.committed = commit; raft_log.persisted = persist; @@ -1751,7 +1756,7 @@ mod test { ]; for (i, &(commit, wcommit, wpanic)) in tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); raft_log.committed = previous_commit; let has_panic = @@ -1788,7 +1793,7 @@ mod test { for i in 1u64..index { store.wl().append(&[new_entry(i, 0)]).expect(""); } - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.maybe_commit(index - 1, 0); let committed = raft_log.committed; #[allow(deprecated)] @@ -1818,7 +1823,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(offset, 0)) .expect(""); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in 1u64..=num { raft_log.append(&[new_entry(i + offset, 0)]); } @@ -1868,7 +1873,7 @@ mod test { fn test_restore_snap() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(100, 1)).expect(""); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); assert_eq!(raft_log.committed, 100); assert_eq!(raft_log.persisted, 100); raft_log.restore(new_snapshot(200, 1)); From ac7256e67d5747e95bd09d92a73611353f3ffd4a Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 28 Mar 2024 16:02:08 +0800 Subject: [PATCH 08/13] resolve comments Signed-off-by: glorv --- src/config.rs | 4 ++-- src/raft.rs | 32 +++++++++++++++++++++------ src/raft_log.rs | 57 +++++++++++++++++++++++++------------------------ 3 files changed, 57 insertions(+), 36 deletions(-) diff --git a/src/config.rs b/src/config.rs index 3788ee69..1bc6d323 100644 --- a/src/config.rs +++ b/src/config.rs @@ -101,7 +101,7 @@ pub struct Config { /// Maximum raft log number that can be applied after commit but before persist. /// The default value is 0, which means apply after both commit and persist. - pub apply_unpersisted_log_limit: u64, + pub max_apply_unpersisted_log_limit: u64, } impl Default for Config { @@ -124,7 +124,7 @@ impl Default for Config { priority: 0, max_uncommitted_size: NO_LIMIT, max_committed_size_per_ready: NO_LIMIT, - apply_unpersisted_log_limit: 0, + max_apply_unpersisted_log_limit: 0, } } } diff --git a/src/raft.rs b/src/raft.rs index 6fcd4b61..90e54934 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -382,7 +382,9 @@ impl Raft { r.load_state(&raft_state.hard_state); } if c.applied > 0 { - r.commit_apply(c.applied); + // at initialize, it is possible that applied_index > committed_index, + // so we should skip this check at `commit_apply`. + r.commit_apply_internal(c.applied, true); } r.become_follower(r.term, INVALID_ID); @@ -601,9 +603,9 @@ impl Raft { self.check_quorum = check_quorum; } - /// Set the limit that applied index can be ahead of persisted index. - pub fn set_apply_unpersisted_log_limit(&mut self, limit: u64) { - self.raft_log.apply_unpersisted_log_limit = limit; + /// Set the maximum limit that applied index can be ahead of persisted index. + pub fn set_max_apply_unpersisted_log_limit(&mut self, limit: u64) { + self.raft_log.max_apply_unpersisted_log_limit = limit; } } @@ -956,9 +958,27 @@ impl Raft { /// /// * Post: Checks to see if it's time to finalize a Joint Consensus state. pub fn commit_apply(&mut self, applied: u64) { + self.commit_apply_internal(applied, true) + } + + /// Commit that the Raft peer has applied up to the given index. + /// + /// Registers the new applied index to the Raft log. + /// if `is_initialize` is true, will skip the applied_index check. + /// + /// # Hooks + /// + /// * Post: Checks to see if it's time to finalize a Joint Consensus state. + fn commit_apply_internal(&mut self, applied: u64, is_initialize: bool) { let old_applied = self.raft_log.applied; - #[allow(deprecated)] - self.raft_log.applied_to(applied); + if !is_initialize { + #[allow(deprecated)] + self.raft_log.applied_to(applied); + } else { + // skip applied_index check at initialization. + assert!(applied > 0 && self.raft_log.applied == 0); + self.raft_log.applied_to_unchecked(applied); + } // TODO: it may never auto_leave if leader steps down before enter joint is applied. if self.prs.conf().auto_leave diff --git a/src/raft_log.rs b/src/raft_log.rs index 41fffc1c..ad7a7cd1 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -43,6 +43,8 @@ pub struct RaftLog { /// on a quorum of nodes. /// /// Invariant: applied <= committed + /// NOTE: this invariant can be break after restart if `max_apply_unpersisted_log_limit` is 0, + /// but once the committed catches up with applied, it should never fall behind again. pub committed: u64, /// The highest log position that is known to be persisted in stable @@ -55,12 +57,12 @@ pub struct RaftLog { /// The highest log position that the application has been instructed /// to apply to its state machine. /// - /// Invariant: applied <= min(committed, persisted + `apply_unpersisted_log_limit`) + /// Invariant: applied <= min(committed, persisted) iff `max_apply_unpersisted_log_limit` is 0. pub applied: u64, /// the maximum log gap between persisted_index and applied_index. /// Caller should ensure the value won't lead to the upper bound overflow. - pub apply_unpersisted_log_limit: u64, + pub max_apply_unpersisted_log_limit: u64, } impl ToString for RaftLog @@ -92,7 +94,7 @@ impl RaftLog { persisted: last_index, applied: first_index - 1, unstable: Unstable::new(last_index + 1, logger), - apply_unpersisted_log_limit: cfg.apply_unpersisted_log_limit, + max_apply_unpersisted_log_limit: cfg.max_apply_unpersisted_log_limit, } } @@ -316,11 +318,8 @@ impl RaftLog { if idx == 0 { return; } - // Do not check idx with committed or persisted index here becase when `apply_unpersisted_log_limit` > 0: - // 1. then it is possible idx > persisted. - // 2. when the application restart after applied but before committed entried(and committed index) is persisted - // then it is also possible idx > committed. - if idx < self.applied { + + if idx > self.next_entries_upper_bound() || idx < self.applied { fatal!( self.unstable.logger, "applied({}) is out of range [prev_applied({}), min(committed({}), persisted({}))]", @@ -330,6 +329,11 @@ impl RaftLog { self.persisted, ) } + self.applied_to_unchecked(idx); + } + + #[inline] + pub(crate) fn applied_to_unchecked(&mut self, idx: u64) { self.applied = idx; } @@ -451,7 +455,7 @@ impl RaftLog { fn next_entries_upper_bound(&self) -> u64 { std::cmp::min( self.committed, - self.persisted + self.apply_unpersisted_log_limit, + self.persisted + self.max_apply_unpersisted_log_limit, ) + 1 } @@ -1214,11 +1218,11 @@ mod test { new_entry(9, 1), new_entry(10, 1), ]; - const MAX: u64 = u32::MAX as u64; + const UNLIMITED: u64 = u32::MAX as u64; let tests = vec![ (0, 3, 3, 0, None), (0, 3, 4, 0, None), - (0, 3, 4, MAX, Some(&ents[..1])), + (0, 3, 4, UNLIMITED, Some(&ents[..1])), (0, 4, 6, 0, Some(&ents[..1])), (0, 4, 6, 2, Some(&ents[..3])), (0, 4, 6, 6, Some(&ents[..3])), @@ -1227,25 +1231,22 @@ mod test { (0, 4, 10, 6, Some(&ents)), (0, 4, 10, 7, Some(&ents)), (0, 6, 4, 0, Some(&ents[..1])), - (0, 6, 4, MAX, Some(&ents[..1])), + (0, 6, 4, UNLIMITED, Some(&ents[..1])), (0, 5, 5, 0, Some(&ents[..2])), - (3, 4, 3, MAX, None), - (3, 5, 5, MAX, Some(&ents[..2])), - (3, 6, 7, MAX, Some(&ents[..4])), - (3, 7, 6, MAX, Some(&ents[..3])), - (4, 5, 5, MAX, Some(&ents[1..2])), - (4, 5, 5, MAX, Some(&ents[1..2])), - (4, 5, 7, MAX, Some(&ents[1..4])), - (4, 5, 9, MAX, Some(&ents[1..6])), - (4, 5, 10, MAX, Some(&ents[1..])), - (4, 7, 5, MAX, Some(&ents[1..2])), + (3, 4, 3, UNLIMITED, None), + (3, 5, 5, UNLIMITED, Some(&ents[..2])), + (3, 6, 7, UNLIMITED, Some(&ents[..4])), + (3, 7, 6, UNLIMITED, Some(&ents[..3])), + (4, 5, 5, UNLIMITED, Some(&ents[1..2])), + (4, 5, 5, UNLIMITED, Some(&ents[1..2])), + (4, 5, 7, UNLIMITED, Some(&ents[1..4])), + (4, 5, 9, UNLIMITED, Some(&ents[1..6])), + (4, 5, 10, UNLIMITED, Some(&ents[1..])), + (4, 7, 5, UNLIMITED, Some(&ents[1..2])), (4, 7, 7, 0, Some(&ents[1..4])), (5, 5, 5, 0, None), - (5, 7, 7, MAX, Some(&ents[2..4])), - (7, 7, 7, MAX, None), - // applied can be higher than commited/persisted after restart. - (15, 9, 10, 0, None), - (15, 10, 10, MAX, None), + (5, 7, 7, UNLIMITED, Some(&ents[2..4])), + (7, 7, 7, UNLIMITED, None), ]; for (i, &(applied, persisted, committed, limit, ref expect_entries)) in tests.iter().enumerate() @@ -1253,7 +1254,7 @@ mod test { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); - raft_log.apply_unpersisted_log_limit = limit; + raft_log.max_apply_unpersisted_log_limit = limit; raft_log.append(&ents); let unstable = raft_log.unstable_entries().to_vec(); if let Some(e) = unstable.last() { From 0b99c9ed8c3e29706120acea8a43363aece56a3b Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 28 Mar 2024 16:10:19 +0800 Subject: [PATCH 09/13] do some rename Signed-off-by: glorv --- src/raft.rs | 1 + src/raft_log.rs | 11 +++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 90e54934..4441d392 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -957,6 +957,7 @@ impl Raft { /// # Hooks /// /// * Post: Checks to see if it's time to finalize a Joint Consensus state. + #[inline] pub fn commit_apply(&mut self, applied: u64) { self.commit_apply_internal(applied, true) } diff --git a/src/raft_log.rs b/src/raft_log.rs index ad7a7cd1..e6d7895c 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -318,8 +318,7 @@ impl RaftLog { if idx == 0 { return; } - - if idx > self.next_entries_upper_bound() || idx < self.applied { + if idx > self.applied_index_upper_bound() || idx < self.applied { fatal!( self.unstable.logger, "applied({}) is out of range [prev_applied({}), min(committed({}), persisted({}))]", @@ -436,7 +435,7 @@ impl RaftLog { /// Returns committed and persisted entries since max(`since_idx` + 1, first_index). pub fn next_entries_since(&self, since_idx: u64, max_size: Option) -> Option> { let offset = cmp::max(since_idx + 1, self.first_index()); - let high = self.next_entries_upper_bound(); + let high = self.applied_index_upper_bound() + 1; if high > offset { match self.slice( offset, @@ -452,11 +451,11 @@ impl RaftLog { } #[inline] - fn next_entries_upper_bound(&self) -> u64 { + fn applied_index_upper_bound(&self) -> u64 { std::cmp::min( self.committed, self.persisted + self.max_apply_unpersisted_log_limit, - ) + 1 + ) } /// Returns all the available entries for execution. @@ -470,7 +469,7 @@ impl RaftLog { /// max(`since_idx` + 1, first_index). pub fn has_next_entries_since(&self, since_idx: u64) -> bool { let offset = cmp::max(since_idx + 1, self.first_index()); - let high = self.next_entries_upper_bound(); + let high = self.applied_index_upper_bound() + 1; high > offset } From ad2181738556c5b73d04a5d209ca2c8f4750f5f0 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 28 Mar 2024 16:22:50 +0800 Subject: [PATCH 10/13] fix test Signed-off-by: glorv --- src/raft.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 4b2b6db4..c26cddc7 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -955,7 +955,7 @@ impl Raft { /// * Post: Checks to see if it's time to finalize a Joint Consensus state. #[inline] pub fn commit_apply(&mut self, applied: u64) { - self.commit_apply_internal(applied, true) + self.commit_apply_internal(applied, false) } /// Commit that the Raft peer has applied up to the given index. @@ -973,7 +973,7 @@ impl Raft { self.raft_log.applied_to(applied); } else { // skip applied_index check at initialization. - assert!(applied > 0 && self.raft_log.applied == 0); + assert!(applied > 0); self.raft_log.applied_to_unchecked(applied); } From fe3c13c6f917f1ade1b100003b6906fb1168dc0f Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 28 Mar 2024 17:24:05 +0800 Subject: [PATCH 11/13] fix comment Signed-off-by: glorv --- src/raft_log.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/raft_log.rs b/src/raft_log.rs index 5446766f..b8f1ae72 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -43,7 +43,7 @@ pub struct RaftLog { /// on a quorum of nodes. /// /// Invariant: applied <= committed - /// NOTE: this invariant can be break after restart if `max_apply_unpersisted_log_limit` is 0, + /// NOTE: this invariant can be break after restart if `max_apply_unpersisted_log_limit` > 0, /// but once the committed catches up with applied, it should never fall behind again. pub committed: u64, From cc6e478900b784db18eaf0703dd1f1e9c3ba8839 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 28 Mar 2024 17:51:51 +0800 Subject: [PATCH 12/13] rename Signed-off-by: glorv --- src/raft.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index c26cddc7..b63e3fd1 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -379,7 +379,7 @@ impl Raft { } if c.applied > 0 { // at initialize, it is possible that applied_index > committed_index, - // so we should skip this check at `commit_apply`. + // so we should skip the check at `commit_apply`. r.commit_apply_internal(c.applied, true); } r.become_follower(r.term, INVALID_ID); @@ -961,14 +961,15 @@ impl Raft { /// Commit that the Raft peer has applied up to the given index. /// /// Registers the new applied index to the Raft log. - /// if `is_initialize` is true, will skip the applied_index check. + /// if `skip_check` is true, will skip the applied_index check, this is only + /// used at initialization. /// /// # Hooks /// /// * Post: Checks to see if it's time to finalize a Joint Consensus state. - fn commit_apply_internal(&mut self, applied: u64, is_initialize: bool) { + fn commit_apply_internal(&mut self, applied: u64, skip_check: bool) { let old_applied = self.raft_log.applied; - if !is_initialize { + if !skip_check { #[allow(deprecated)] self.raft_log.applied_to(applied); } else { From 47c7f1debc2f065efa0dd004e52cbc1fb82a6f3b Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 28 Mar 2024 18:28:45 +0800 Subject: [PATCH 13/13] unpdate doc comment Signed-off-by: glorv --- src/raft_log.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/raft_log.rs b/src/raft_log.rs index b8f1ae72..28aa3f37 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -43,7 +43,7 @@ pub struct RaftLog { /// on a quorum of nodes. /// /// Invariant: applied <= committed - /// NOTE: this invariant can be break after restart if `max_apply_unpersisted_log_limit` > 0, + /// NOTE: this invariant can be break after restart if max_apply_unpersisted_log_limit > 0, /// but once the committed catches up with applied, it should never fall behind again. pub committed: u64, @@ -57,11 +57,15 @@ pub struct RaftLog { /// The highest log position that the application has been instructed /// to apply to its state machine. /// - /// Invariant: applied <= min(committed, persisted) iff `max_apply_unpersisted_log_limit` is 0. + /// Invariant: applied <= committed. + /// NOTE: + /// - this invariant can be break after restart if max_apply_unpersisted_log_limit > 0, + /// but once the committed catches up with applied, it should never fall behind again. + /// - if `max_apply_unpersisted_log_limit` is 0, applied < persisted is also ensured + /// (if it is changed from >0 to 0, it is ensured after persisted catching up with applied). pub applied: u64, - /// the maximum log gap between persisted_index and applied_index. - /// Caller should ensure the value won't lead to the upper bound overflow. + /// The maximum log gap between persisted and applied. pub max_apply_unpersisted_log_limit: u64, }