Skip to content

Commit

Permalink
raftstore: enable apply committed log before persistence by default (#…
Browse files Browse the repository at this point in the history
…16834)

ref #16717

Change the default value of `raftstore.max_apply_unpersisted_log_limit` from 0 to 1024 to enable apply committed log before persistence by default.

Signed-off-by: glorv <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
glorv and ti-chi-bot[bot] authored Apr 22, 2024
1 parent b79069a commit cbda550
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 10 deletions.
2 changes: 1 addition & 1 deletion components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl Default for Config {
raft_log_gc_threshold: 50,
raft_log_gc_count_limit: None,
raft_log_gc_size_limit: None,
max_apply_unpersisted_log_limit: 0,
max_apply_unpersisted_log_limit: 1024,
follower_read_max_log_gap: 100,
raft_log_reserve_max_ticks: 6,
raft_engine_purge_interval: ReadableDuration::secs(10),
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ where
#[inline]
pub fn maybe_update_apply_unpersisted_log_state(&mut self, applied_index: u64) {
if self.min_safe_index_for_unpersisted_apply > 0
&& self.min_safe_index_for_unpersisted_apply < applied_index
&& self.min_safe_index_for_unpersisted_apply <= applied_index
{
if self.max_apply_unpersisted_log_limit > 0
&& self
Expand Down
18 changes: 18 additions & 0 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use region_cache_memory_engine::RangeCacheMemoryEngine;
use server::common::{ConfiguredRaftEngine, KvEngineBuilder};
use tempfile::TempDir;
use test_pd_client::TestPdClient;
use test_util::eventually;
use tikv::{
config::*,
server::KvEngineFactoryBuilder,
Expand Down Expand Up @@ -107,6 +108,23 @@ pub fn must_get<EK: KvEngine>(
)
}

pub fn eventually_get_equal<EK: KvEngine>(engine: &impl RawEngine<EK>, key: &[u8], value: &[u8]) {
eventually(
Duration::from_millis(100),
Duration::from_millis(2000),
|| {
let res = engine
.get_value_cf("default", &keys::data_key(key))
.unwrap();
if let Some(res) = res.as_ref() {
value == &res[..]
} else {
false
}
},
);
}

pub fn must_get_equal<EK: KvEngine>(engine: &impl RawEngine<EK>, key: &[u8], value: &[u8]) {
must_get(engine, "default", key, Some(value));
}
Expand Down
1 change: 0 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6908,7 +6908,6 @@ mod tests {
cfg.raftdb.titan.max_background_gc = default_cfg.raftdb.titan.max_background_gc;
cfg.backup.num_threads = default_cfg.backup.num_threads;
cfg.log_backup.num_threads = default_cfg.log_backup.num_threads;
cfg.raft_store.cmd_batch_concurrent_ready_max_count = 1;

// There is another set of config values that we can't directly compare:
// When the default values are `None`, but are then resolved to `Some(_)` later
Expand Down
3 changes: 0 additions & 3 deletions tests/failpoints/cases/test_async_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,7 @@ fn test_node_async_fetch() {
fn test_persist_delay_block_log_compaction() {
let mut cluster = new_node_cluster(0, 3);

cluster.cfg.raft_store.cmd_batch_concurrent_ready_max_count = 0;
cluster.cfg.raft_store.store_io_pool_size = 1;
cluster.cfg.raft_store.max_apply_unpersisted_log_limit = 10000;

cluster.cfg.raft_store.raft_log_gc_count_limit = Some(100000);
cluster.cfg.raft_store.raft_log_gc_threshold = 50;
cluster.cfg.raft_store.raft_log_gc_size_limit = Some(ReadableSize::mb(20));
Expand Down
47 changes: 45 additions & 2 deletions tests/failpoints/cases/test_async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fn test_async_io_commit_without_leader_persist() {
let mut cluster = new_cluster(0, 3);
cluster.cfg.raft_store.cmd_batch_concurrent_ready_max_count = 0;
cluster.cfg.raft_store.store_io_pool_size = 2;
cluster.cfg.raft_store.max_apply_unpersisted_log_limit = 0;
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

Expand Down Expand Up @@ -52,9 +53,7 @@ fn test_async_io_commit_without_leader_persist() {
#[test]
fn test_async_io_apply_without_leader_persist() {
let mut cluster = new_node_cluster(0, 3);
cluster.cfg.raft_store.cmd_batch_concurrent_ready_max_count = 0;
cluster.cfg.raft_store.store_io_pool_size = 1;
cluster.cfg.raft_store.max_apply_unpersisted_log_limit = 10000;
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

Expand Down Expand Up @@ -94,6 +93,50 @@ fn test_async_io_apply_without_leader_persist() {
}
}

#[test]
fn test_async_io_apply_conf_change_without_leader_persist() {
let mut cluster = new_node_cluster(0, 4);
cluster.cfg.raft_store.store_io_pool_size = 1;
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let r1 = cluster.run_conf_change();

pd_client.must_add_peer(r1, new_peer(2, 2));
pd_client.must_add_peer(r1, new_peer(3, 3));

cluster.must_put(b"k1", b"v1");

let raft_before_save_on_store_1_fp = "raft_before_persist_on_store_1";
// Skip persisting to simulate raft log persist lag but not block node restart.
fail::cfg(raft_before_save_on_store_1_fp, "return").unwrap();

for i in 2..10 {
let _ = cluster
.async_put(format!("k{}", i).as_bytes(), b"v1")
.unwrap();
}
must_get_equal(&cluster.get_engine(1), b"k9", b"v1");

pd_client.must_add_peer(r1, new_peer(4, 4));
pd_client.remove_peer(r1, new_peer(3, 3));

cluster.must_put(b"k1", b"v2");
for i in [1, 2, 4] {
eventually_get_equal(&cluster.get_engine(i), b"k1", b"v2");
}
must_get_none(&cluster.get_engine(3), b"k1");

cluster.stop_node(1);
fail::remove(raft_before_save_on_store_1_fp);

// Node 1 can recover successfully.
cluster.run_node(1).unwrap();

cluster.must_put(b"k1", b"v3");
eventually_get_equal(&cluster.get_engine(1), b"k1", b"v3");
}

/// Test if the leader delays its destroy after applying conf change to
/// remove itself.
#[test_case(test_raftstore::new_node_cluster)]
Expand Down
1 change: 1 addition & 0 deletions tests/failpoints/cases/test_early_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn test_multi_early_apply() {
let mut cluster = new_cluster(0, 3);
cluster.pd_client.disable_default_operator();
cluster.cfg.raft_store.store_batch_system.pool_size = 1;
cluster.cfg.raft_store.max_apply_unpersisted_log_limit = 0;
// So compact log will not be triggered automatically.
configure_for_request_snapshot(&mut cluster.cfg);

Expand Down
2 changes: 0 additions & 2 deletions tests/failpoints/cases/test_unsafe_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,7 @@ fn test_unsafe_recovery_rollback_merge() {
fn test_unsafe_recovery_apply_before_persist() {
let mut cluster = new_node_cluster(0, 5);
cluster.cfg.raft_store.raft_store_max_leader_lease = ReadableDuration::millis(40);
cluster.cfg.raft_store.cmd_batch_concurrent_ready_max_count = 0;
cluster.cfg.raft_store.store_io_pool_size = 1;
cluster.cfg.raft_store.max_apply_unpersisted_log_limit = 10000;
cluster.run();
assert_eq!(cluster.get_node_ids().len(), 5);

Expand Down

0 comments on commit cbda550

Please sign in to comment.