Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow raft apply committed logs before they are persisted #537

Merged
merged 14 commits into from
Mar 28, 2024
1 change: 0 additions & 1 deletion examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::sync::mpsc::{self, RecvTimeoutError};
use std::thread;
use std::time::{Duration, Instant};

use raft::eraftpb::ConfState;
use raft::prelude::*;
use raft::storage::MemStorage;

Expand Down
4 changes: 2 additions & 2 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::panic::{self, AssertUnwindSafe};
use harness::*;
use protobuf::Message as PbMessage;
use raft::eraftpb::*;
use raft::storage::{GetEntriesContext, MemStorage};
use raft::storage::MemStorage;
use raft::*;
use raft_proto::*;
use slog::Logger;
Expand Down Expand Up @@ -5301,7 +5301,7 @@ fn test_election_with_priority_log() {
(false, false, true, 1, 1, 3, 1, StateRole::Leader),
];

for (_i, &(l1, l2, l3, p1, p2, p3, id, state)) in tests.iter().enumerate() {
for (l1, l2, l3, p1, p2, p3, id, state) in tests {
let l = default_logger();
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l);
Expand Down
2 changes: 1 addition & 1 deletion harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use harness::Network;
use protobuf::{Message as PbMessage, ProtobufEnum as _};
use raft::eraftpb::*;
use raft::storage::{GetEntriesContext, MemStorage};
use raft::storage::MemStorage;
use raft::*;
use raft_proto::*;
use slog::Logger;
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ pub struct Config {

/// Max size for committed entries in a `Ready`.
pub max_committed_size_per_ready: u64,

/// Maximum raft log number that can be applied after commit but before persist.
/// The default value is 0, which means apply after both commit and persist.
pub apply_unpersisted_log_limit: u64,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it support dynamic config update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we provide a public function Raft::set_apply_unpersisted_log_limit to change this value in realtime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about max_apply_unpersisted_log_limit?

}

impl Default for Config {
Expand All @@ -120,6 +124,7 @@ impl Default for Config {
priority: 0,
max_uncommitted_size: NO_LIMIT,
max_committed_size_per_ready: NO_LIMIT,
apply_unpersisted_log_limit: 0,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ before taking old, removed peers offline.
#![deny(clippy::all)]
#![deny(missing_docs)]
#![recursion_limit = "128"]
// TODO: remove this when we update the mininum rust compatible version.
#![allow(unused_imports)]
// This is necessary to support prost and rust-protobuf at the same time.
#![allow(clippy::useless_conversion)]
// This lint recommends some bad choices sometimes.
Expand Down
8 changes: 7 additions & 1 deletion src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ impl<T: Storage> DerefMut for Raft<T> {
}
}

#[allow(dead_code)]
trait AssertSend: Send {}

impl<T: Storage + Send> AssertSend for Raft<T> {}
Expand Down Expand Up @@ -333,7 +334,7 @@ impl<T: Storage> Raft<T> {
r: RaftCore {
id: c.id,
read_states: Default::default(),
raft_log: RaftLog::new(store, logger.clone()),
raft_log: RaftLog::new(store, logger.clone(), c),
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
Expand Down Expand Up @@ -599,6 +600,11 @@ impl<T: Storage> Raft<T> {
pub fn set_check_quorum(&mut self, check_quorum: bool) {
self.check_quorum = check_quorum;
}

/// Set the limit that applied index can be ahead of persisted index.
pub fn set_apply_unpersisted_log_limit(&mut self, limit: u64) {
self.raft_log.apply_unpersisted_log_limit = limit;
}
}

impl<T: Storage> RaftCore<T> {
Expand Down
Loading
Loading