diff --git a/Cargo.lock b/Cargo.lock index 32eb0fb10a1..4573bfca278 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,7 +215,7 @@ dependencies = [ "prometheus", "raft", "raftstore", - "rand", + "rand 0.7.3", "security", "serde", "serde_derive", @@ -497,6 +497,15 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "chan" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d14956a3dae065ffaa0d92ece848ab4ced88d32361e7fdfbfd653a5c454a1ed8" +dependencies = [ + "rand 0.3.23", +] + [[package]] name = "chrono" version = "0.4.11" @@ -604,7 +613,7 @@ dependencies = [ "raft", "raft_log_engine", "raftstore", - "rand", + "rand 0.7.3", "security", "serde_json", "signal", @@ -633,7 +642,7 @@ dependencies = [ "libc", "panic_hook", "protobuf", - "rand", + "rand 0.7.3", "static_assertions", "tikv_alloc", ] @@ -647,7 +656,7 @@ dependencies = [ "fail", "futures 0.3.7", "parking_lot 0.11.0", - "rand", + "rand 0.7.3", "tikv_alloc", "tokio", "txn_types", @@ -689,7 +698,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c750ec12b83377637110d5a57f5ae08e895b06c4b16e2bdbf1a94ef717428c59" dependencies = [ "proc-macro-hack", - "rand", + "rand 0.7.3", ] [[package]] @@ -755,7 +764,7 @@ dependencies = [ "itertools 0.8.2", "lazy_static", "num-traits", - "rand_core", + "rand_core 0.5.1", "rand_os", "rand_xoshiro", "rayon", @@ -1088,7 +1097,7 @@ dependencies = [ "openssl", "prometheus", "protobuf", - "rand", + "rand 0.7.3", "rusoto_core", "rusoto_credential", "rusoto_kms", @@ -1137,7 +1146,7 @@ dependencies = [ "prometheus-static-metric", "protobuf", "raft", - "rand", + "rand 0.7.3", "rocksdb", "serde", "serde_derive", @@ -1232,7 +1241,7 @@ dependencies = [ "lazy_static", "matches", "prometheus", - "rand", + "rand 0.7.3", "rusoto_core", "rusoto_mock", "rusoto_s3", @@ -1257,7 +1266,7 @@ checksum = "3be3c61c59fdc91f5dbc3ea31ee8623122ce80057058be560654c5d410d181a6" dependencies = [ "lazy_static", "log", - "rand", + "rand 0.7.3", ] [[package]] @@ -1296,7 +1305,7 @@ dependencies = [ "fs2", "lazy_static", "openssl", - "rand", + "rand 0.7.3", "tempfile", "tikv_alloc", ] @@ -1344,6 +1353,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f2a4a2034423744d2cc7ca2068453168dcdb82c438419e639a26bd87839c674" +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "fuchsia-zircon" version = "0.3.3" @@ -3092,7 +3107,7 @@ dependencies = [ "protobuf", "quick-error", "raft-proto", - "rand", + "rand 0.7.3", "slog", ] @@ -3152,6 +3167,7 @@ dependencies = [ "batch-system", "bitflags", "byteorder", + "chan", "concurrency_manager", "configuration", "crc32fast", @@ -3184,7 +3200,7 @@ dependencies = [ "quick-error", "raft", "raft-proto", - "rand", + "rand 0.7.3", "serde", "serde_derive", "serde_with", @@ -3204,6 +3220,29 @@ dependencies = [ "yatp", ] +[[package]] +name = "rand" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" +dependencies = [ + "libc", + "rand 0.4.6", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi 0.3.8", +] + [[package]] name = "rand" version = "0.7.3" @@ -3213,7 +3252,7 @@ dependencies = [ "getrandom", "libc", "rand_chacha", - "rand_core", + "rand_core 0.5.1", "rand_hc", ] @@ -3224,9 +3263,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a2a90da8c7523f554344f921aa97283eadf6ac484a6d2a7d0212fa7f8d6853" dependencies = [ "c2-chacha", - "rand_core", + "rand_core 0.5.1", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.5.1" @@ -3242,7 +3296,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3251,7 +3305,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df6b0b3dc9991a10b2d91a86d1129314502169a1bf6afa67328945e02498b76" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3261,7 +3315,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a788ae3edb696cfcba1c19bfd388cc4b8c21f8a408432b199c072825084da58a" dependencies = [ "getrandom", - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3270,7 +3324,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77d416b86801d23dde1aa643023b775c3a462efc0ed96443add11546cdf1dca8" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3279,7 +3333,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e18c91676f670f6f0312764c759405f13afb98d5d73819840cf72a518487bff" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3317,6 +3371,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.1.56" @@ -4245,7 +4308,7 @@ checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" dependencies = [ "cfg-if", "libc", - "rand", + "rand 0.7.3", "redox_syscall", "remove_dir_all", "winapi 0.3.8", @@ -4333,7 +4396,7 @@ dependencies = [ "pd_client", "raft", "raftstore", - "rand", + "rand 0.7.3", "security", "slog", "slog-global", @@ -4380,7 +4443,7 @@ dependencies = [ "fail", "grpcio", "kvproto", - "rand", + "rand 0.7.3", "rand_isaac", "security", "slog", @@ -4423,7 +4486,7 @@ dependencies = [ "protobuf", "raft", "raftstore", - "rand", + "rand 0.7.3", "rand_xorshift", "security", "semver 0.10.0", @@ -4614,7 +4677,7 @@ dependencies = [ "panic_hook", "profiler", "protobuf", - "rand", + "rand 0.7.3", "safemem", "static_assertions", "tidb_query_codegen", @@ -4697,7 +4760,7 @@ dependencies = [ "raft", "raft_log_engine", "raftstore", - "rand", + "rand 0.7.3", "regex", "rev_lines", "security", @@ -4817,7 +4880,7 @@ dependencies = [ "prometheus", "protobuf", "quick-error", - "rand", + "rand 0.7.3", "regex", "serde", "serde_json", @@ -5159,7 +5222,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" dependencies = [ - "rand", + "rand 0.7.3", "serde", ] @@ -5415,7 +5478,7 @@ dependencies = [ "num_cpus", "parking_lot_core 0.8.0", "prometheus", - "rand", + "rand 0.7.3", ] [[package]] @@ -5430,7 +5493,7 @@ version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e12b8667a4fff63d236f8363be54392f93dbb13616be64a83e761a9319ab589" dependencies = [ - "rand", + "rand 0.7.3", ] [[package]] diff --git a/components/engine_panic/src/raft_engine.rs b/components/engine_panic/src/raft_engine.rs index ff9f3d43338..4cc1822303d 100644 --- a/components/engine_panic/src/raft_engine.rs +++ b/components/engine_panic/src/raft_engine.rs @@ -105,6 +105,10 @@ impl RaftLogBatch for PanicWriteBatch { panic!() } + fn persist_size(&self) -> usize { + panic!() + } + fn is_empty(&self) -> bool { panic!() } diff --git a/components/engine_rocks/src/raft_engine.rs b/components/engine_rocks/src/raft_engine.rs index f443e01709a..c5ad0cda8cc 100644 --- a/components/engine_rocks/src/raft_engine.rs +++ b/components/engine_rocks/src/raft_engine.rs @@ -239,6 +239,10 @@ impl RaftLogBatch for RocksWriteBatch { self.put_msg(&keys::raft_state_key(raft_group_id), state) } + fn persist_size(&self) -> usize { + self.data_size() + } + fn is_empty(&self) -> bool { Mutable::is_empty(self) } diff --git a/components/engine_traits/src/raft_engine.rs b/components/engine_traits/src/raft_engine.rs index 68dc66285d1..3b9dac1bd77 100644 --- a/components/engine_traits/src/raft_engine.rs +++ b/components/engine_traits/src/raft_engine.rs @@ -87,6 +87,8 @@ pub trait RaftLogBatch: Send { fn put_raft_state(&mut self, raft_group_id: u64, state: &RaftLocalState) -> Result<()>; + fn persist_size(&self) -> usize; + fn is_empty(&self) -> bool; } diff --git a/components/raft_log_engine/src/engine.rs b/components/raft_log_engine/src/engine.rs index a77249d643a..e6cb5bcc3be 100644 --- a/components/raft_log_engine/src/engine.rs +++ b/components/raft_log_engine/src/engine.rs @@ -59,6 +59,10 @@ impl RaftLogBatchTrait for RaftLogBatch { Ok(()) } + fn persist_size(&self) -> usize { + panic!("not impl!") + } + fn is_empty(&self) -> bool { self.0.items.is_empty() } diff --git a/components/raftstore/Cargo.toml b/components/raftstore/Cargo.toml index abd9ed62b6d..052729a03f4 100644 --- a/components/raftstore/Cargo.toml +++ b/components/raftstore/Cargo.toml @@ -83,6 +83,7 @@ tokio = { version = "0.2", features = ["sync", "rt-threaded"] } txn_types = { path = "../txn_types" } uuid = { version = "0.8.1", features = ["serde", "v4"] } yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" } +chan = "0.1" [dev-dependencies] engine_test = { path = "../engine_test" } diff --git a/components/raftstore/src/lib.rs b/components/raftstore/src/lib.rs index e74bae5fe7b..3fcb5fa6378 100644 --- a/components/raftstore/src/lib.rs +++ b/components/raftstore/src/lib.rs @@ -6,6 +6,9 @@ #![feature(div_duration)] #![feature(min_specialization)] #![feature(box_patterns)] +#![feature(duration_saturating_ops)] +#![feature(duration_zero)] +#![recursion_limit="256"] #[macro_use] extern crate bitflags; diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index e0abd7d6de0..7bb5bf8f5fc 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -30,6 +30,22 @@ pub struct Config { // delay time of raft db sync (us). #[config(skip)] pub delay_sync_us: u64, + #[config(skip)] + pub store_io_min_interval_us: u64, + #[config(skip)] + pub store_io_pool_size: u64, + #[config(skip)] + pub store_io_queue_size: u64, + #[config(skip)] + pub store_io_queue_init_bytes: u64, + #[config(skip)] + pub store_io_queue_bytes_step: f64, + #[config(skip)] + pub store_io_queue_sample_quantile: f64, + #[config(skip)] + pub store_io_queue_adaptive_gain: u64, + #[config(skip)] + pub apply_io_size: u64, // minimizes disruption when a partitioned node rejoins the cluster by using a two phase election. #[config(skip)] pub prevote: bool, @@ -196,6 +212,14 @@ impl Default for Config { let split_size = ReadableSize::mb(coprocessor::config::SPLIT_SIZE_MB); Config { delay_sync_us: 0, + store_io_min_interval_us: 300, + store_io_pool_size: 2, + store_io_queue_size: 64, + store_io_queue_init_bytes: 256 * 1024, + store_io_queue_bytes_step: 1.414213562373095, + store_io_queue_adaptive_gain: 0, + store_io_queue_sample_quantile: 0.9, + apply_io_size: 0, prevote: true, raftdb_path: String::new(), capacity: ReadableSize(0), @@ -421,6 +445,30 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["delay_sync_us"]) .set((self.delay_sync_us as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_min_interval_us"]) + .set((self.store_io_min_interval_us as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_pool_size"]) + .set((self.store_io_pool_size as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_size"]) + .set((self.store_io_queue_size as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_init_bytes"]) + .set((self.store_io_queue_init_bytes as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_bytes_step"]) + .set((self.store_io_queue_bytes_step as f64).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_sample_quantile"]) + .set((self.store_io_queue_sample_quantile as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_adaptive_gain"]) + .set((self.store_io_queue_adaptive_gain as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["apply_io_size"]) + .set((self.apply_io_size as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["prevote"]) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 644ad98b0a9..e09fe3502e2 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -353,6 +353,8 @@ where yield_duration: Duration, + apply_io_size: usize, + store_id: u64, /// region_id -> (peer_id, is_splitting) /// Used for handling race between splitting and creating new peer. @@ -402,6 +404,7 @@ where use_delete_range: cfg.use_delete_range, perf_context_statistics: PerfContextStatistics::new(cfg.perf_level), yield_duration: cfg.apply_yield_duration.0, + apply_io_size: cfg.apply_io_size as usize, store_id, pending_create_peers, } @@ -922,7 +925,11 @@ where if !data.is_empty() { let cmd = util::parse_data_at(data, index, &self.tag); - if should_write_to_engine(&cmd) || apply_ctx.kv_wb().should_write_to_engine() { + if should_write_to_engine(&cmd) + || (apply_ctx.apply_io_size != 0 + && (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size)) + || (apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine()) + { apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.elapsed() >= apply_ctx.yield_duration { diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 6bfb905d75e..85494c35c02 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -49,7 +49,7 @@ use crate::store::fsm::metrics::*; use crate::store::fsm::peer::{ maybe_destroy_source, new_admin_request, PeerFsm, PeerFsmDelegate, SenderFsmPair, }; -use crate::store::fsm::sync_policy::{new_sync_policy, SyncAction, SyncPolicy}; +use crate::store::fsm::sync_policy::{new_sync_policy, SyncAction, SyncPolicy, UnsyncedReady}; use crate::store::fsm::ApplyNotifier; use crate::store::fsm::ApplyTaskRes; use crate::store::fsm::{ @@ -82,6 +82,172 @@ const RAFT_WB_SHRINK_SIZE: usize = 1024 * 1024; pub const PENDING_MSG_CAP: usize = 100; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); +use crate::store::peer_storage::AsyncWriterTask; +use crate::store::peer_storage::AsyncWriterTasks; +use std::collections::VecDeque; +use std::thread::JoinHandle; + +pub struct AsyncWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + engine: ER, + router: RaftRouter, + tag: String, + io_min_interval: Duration, + tasks: Arc>>, + workers: Arc>>>, +} + +impl Clone for AsyncWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + fn clone(&self) -> Self { + AsyncWriter { + engine: self.engine.clone(), + router: self.router.clone(), + tag: self.tag.clone(), + io_min_interval: self.io_min_interval.clone(), + tasks: self.tasks.clone(), + workers: self.workers.clone(), + } + } +} + +impl AsyncWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + pub fn new( + engine: ER, + router: RaftRouter, + tag: String, + pool_size: usize, + io_min_interval_us: u64, + tasks: AsyncWriterTasks, + ) -> AsyncWriter { + let mut async_writer = AsyncWriter { + engine, + router, + tag, + io_min_interval: Duration::from_micros(io_min_interval_us), + tasks: Arc::new(Mutex::new(tasks)), + workers: Arc::new(Mutex::new(vec![])), + }; + async_writer.spawn(pool_size); + async_writer + } + + fn spawn(&mut self, pool_size: usize) { + for i in 0..pool_size { + let mut x = self.clone(); + let t = thread::Builder::new() + .name(thd_name!(format!("raftdb-async-writer-{}", i))) + .spawn(move || { + let mut last_ts = TiInstant::now_coarse(); + loop { + let mut now_ts = TiInstant::now_coarse(); + let delta = (now_ts - last_ts).saturating_sub(x.io_min_interval); + if !delta.is_zero() { + thread::sleep(delta); + now_ts = TiInstant::now_coarse(); + } + last_ts = now_ts; + + let task = { + let mut tasks = x.tasks.lock().unwrap(); + if tasks.no_task() { + // TODO: block and wait for new data arrive + continue; + } + tasks.detach_task() + }; + + let wb = x.sync_write(task.wb, task.unsynced_readies); + + // TODO: block if too many tasks + { + let mut tasks = x.tasks.lock().unwrap(); + tasks.add(AsyncWriterTask { + wb, + unsynced_readies: HashMap::default(), + }); + } + + STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM + .observe(duration_to_sec(now_ts.elapsed()) as f64); + } + }) + .unwrap(); + // TODO: graceful exit + self.workers.lock().unwrap().push(t); + } + } + + pub fn raft_wb_pool(&mut self) -> Arc>> { + self.tasks.clone() + } + + // TODO: this func is assumed in tasks.locked status + pub fn sync_write( + &mut self, + mut wb: ER::LogBatch, + mut unsynced_readies: HashMap, + ) -> ER::LogBatch { + let now = TiInstant::now_coarse(); + self.engine + .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) + .unwrap_or_else(|e| { + panic!("{} failed to save raft append result: {:?}", self.tag, e); + }); + STORE_WRITE_RAFTDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64); + self.flush_unsynced_readies(&unsynced_readies); + unsynced_readies.clear(); + wb + } + + fn flush_unsynced_readies(&mut self, unsynced_readies: &HashMap) { + for (_, r) in unsynced_readies { + self.flush_unsynced_ready(r); + } + } + + fn drain_flush_unsynced_readies(&mut self, mut unsynced_readies: VecDeque) { + for r in unsynced_readies.drain(..) { + self.flush_unsynced_ready(&r); + } + } + + fn flush_unsynced_ready(&mut self, r: &UnsyncedReady) { + loop { + let pre_number = r.notifier.load(Ordering::Acquire); + // TODO: reduce duplicated messages + //assert_ne!(pre_number, r.number); + if pre_number >= r.number { + break; + } + if pre_number + == r.notifier + .compare_and_swap(pre_number, r.number, Ordering::AcqRel) + { + if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { + error!( + "failed to send noop to trigger persisted ready"; + "region_id" => r.region_id, + "ready_number" => r.number, + "error" => ?e, + ); + } + break; + } + } + } +} + pub struct StoreInfo { pub engine: E, pub capacity: u64, @@ -329,7 +495,8 @@ where pub store_stat: LocalStoreStat, pub engines: Engines, pub kv_wb: EK::WriteBatch, - pub raft_wb: ER::LogBatch, + //pub raft_wb: ER::LogBatch, + pub raft_wb_is_empty: bool, pub pending_count: usize, pub sync_log: bool, pub has_ready: bool, @@ -339,15 +506,18 @@ where pub tick_batch: Vec, pub node_start_time: Option, pub sync_policy: SyncPolicy>, + pub async_writer: Arc>>, } -impl HandleRaftReadyContext for PollContext +impl HandleRaftReadyContext for PollContext where EK: KvEngine, ER: RaftEngine, { - fn wb_mut(&mut self) -> (&mut EK::WriteBatch, &mut ER::LogBatch) { - (&mut self.kv_wb, &mut self.raft_wb) + fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc>>) { + self.raft_wb_is_empty = false; + let mut async_writer = self.async_writer.lock().unwrap(); + (&mut self.kv_wb, async_writer.raft_wb_pool()) } #[inline] @@ -356,8 +526,10 @@ where } #[inline] - fn raft_wb_mut(&mut self) -> &mut ER::LogBatch { - &mut self.raft_wb + fn raft_wb_pool(&mut self) -> Arc>> { + self.raft_wb_is_empty = false; + let mut async_writer = self.async_writer.lock().unwrap(); + async_writer.raft_wb_pool() } #[inline] @@ -620,6 +792,7 @@ pub struct RaftPoller>, previous_metrics: RaftMetrics, timer: TiInstant, + loop_timer: TiInstant, poll_ctx: PollContext, messages_per_tick: usize, cfg_tracker: Tracker, @@ -631,13 +804,14 @@ impl RaftPoller { // the id of slow store in tests. fail_point!("on_raft_ready", self.poll_ctx.store_id() == 3, |_| {}); if self.poll_ctx.trans.need_flush() - && (!self.poll_ctx.kv_wb.is_empty() || !self.poll_ctx.raft_wb.is_empty()) + && (!self.poll_ctx.kv_wb.is_empty() || !self.poll_ctx.raft_wb_is_empty) { self.poll_ctx.trans.flush(); } let ready_cnt = self.poll_ctx.ready_res.len(); self.poll_ctx.raft_metrics.ready.has_ready_region += ready_cnt as u64; fail_point!("raft_before_save"); + let now = TiInstant::now_coarse(); if !self.poll_ctx.kv_wb.is_empty() { let mut write_opts = WriteOptions::new(); write_opts.set_sync(true); @@ -656,39 +830,17 @@ impl RaftPoller { } } fail_point!("raft_between_save"); + STORE_WRITE_KVDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64); - let raft_wb_is_empty = self.poll_ctx.raft_wb.is_empty(); + let raft_wb_is_empty = self.poll_ctx.raft_wb_is_empty; if !raft_wb_is_empty { fail_point!( "raft_before_save_on_store_1", self.poll_ctx.store_id() == 1, |_| {} ); - self.poll_ctx - .engines - .raft - .consume_and_shrink( - &mut self.poll_ctx.raft_wb, - false, - RAFT_WB_SHRINK_SIZE, - 4 * 1024, - ) - .unwrap_or_else(|e| { - panic!("{} failed to save raft append result: {:?}", self.tag, e); - }); } - let synced = if self.poll_ctx.sync_policy.delay_sync_enabled() { - self.poll_ctx.sync_policy.sync_if_needed(true) - } else { - if !raft_wb_is_empty { - self.poll_ctx.engines.raft.sync().unwrap_or_else(|e| { - panic!("{} failed to sync raft engine: {:?}", self.tag, e); - }); - } - true - }; - report_perf_context!( self.poll_ctx.perf_context_statistics, STORE_PERF_CONTEXT_TIME_HISTOGRAM_STATIC @@ -696,11 +848,7 @@ impl RaftPoller { fail_point!("raft_after_save"); if ready_cnt != 0 { - let unsynced_version = if synced { - None - } else { - Some(self.poll_ctx.sync_policy.new_unsynced_version()) - }; + let unsynced_version = Some(self.poll_ctx.sync_policy.new_unsynced_version()); let mut batch_pos = 0; let mut ready_res = mem::take(&mut self.poll_ctx.ready_res); for (ready, invoke_ctx) in ready_res.drain(..) { @@ -715,6 +863,24 @@ impl RaftPoller { .post_raft_ready_append(ready, invoke_ctx, unsynced_version); } } + + if !raft_wb_is_empty { + // Do nothing + /* Sync write testing + let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); + let raft_wb_pool = async_writer.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let task = raft_wbs.pop_front().unwrap(); + let wb = async_writer.sync_write(task.wb, task.unsynced_readies); + raft_wbs.push_back(AsyncWriterTask{wb, unsynced_readies: HashMap::default()}); + */ + } else { + let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); + let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); + async_writer.drain_flush_unsynced_readies(unsynced_readies); + } + self.poll_ctx.raft_wb_is_empty = true; + let dur = self.timer.elapsed(); if !self.poll_ctx.store_stat.is_busy { let election_timeout = Duration::from_millis( @@ -775,6 +941,8 @@ impl PollHandler, St self.poll_ctx.sync_log = false; self.poll_ctx.has_ready = false; self.timer = TiInstant::now_coarse(); + STORE_LOOP_DURATION_HISTOGRAM.observe(duration_to_sec(self.loop_timer.elapsed()) as f64); + self.loop_timer = TiInstant::now_coarse(); // update config self.poll_ctx.perf_context_statistics.start(); if let Some(incoming) = self.cfg_tracker.any_new() { @@ -797,7 +965,6 @@ impl PollHandler, St self.poll_ctx.cfg = incoming.clone(); self.poll_ctx.update_ticks_timeout(); } - self.poll_ctx.sync_policy.try_flush_readies(); } fn handle_control(&mut self, store: &mut StoreFsm) -> Option { @@ -866,7 +1033,6 @@ impl PollHandler, St fn end(&mut self, peers: &mut [Box>]) { self.flush_ticks(); - self.poll_ctx.sync_policy.try_flush_readies(); if self.poll_ctx.has_ready { self.handle_raft_ready(peers); } @@ -875,19 +1041,20 @@ impl PollHandler, St .raft_metrics .process_ready .observe(duration_to_sec(self.timer.elapsed()) as f64); + STORE_LOOP_WORK_DURATION_HISTOGRAM + .observe(duration_to_sec(self.loop_timer.elapsed()) as f64); self.poll_ctx.raft_metrics.flush(); - self.poll_ctx.sync_policy.metrics.flush(); self.poll_ctx.store_stat.flush(); } fn pause(&mut self) -> bool { - let all_synced_and_flushed = self.poll_ctx.sync_policy.try_sync_and_flush(); if self.poll_ctx.trans.need_flush() { self.poll_ctx.trans.flush(); } // If there are cached data and go into pause status, that will cause high latency or hunger // so it should return false(means pause failed) when there are still jobs to do - all_synced_and_flushed + //all_synced_and_flushed + true } } @@ -913,6 +1080,7 @@ pub struct RaftPollerBuilder { applying_snap_count: Arc, global_replication_state: Arc>, pub sync_policy: SyncPolicy>, + pub async_writer: Arc>>, } impl RaftPollerBuilder { @@ -1113,7 +1281,8 @@ where store_stat: self.global_stat.local(), engines: self.engines.clone(), kv_wb: self.engines.kv.write_batch(), - raft_wb: self.engines.raft.log_batch(4 * 1024), + //raft_wb: self.engines.raft.log_batch(4 * 1024), + raft_wb_is_empty: true, pending_count: 0, sync_log: false, has_ready: false, @@ -1123,6 +1292,7 @@ where tick_batch: vec![PeerTickBatch::default(); 256], node_start_time: Some(TiInstant::now_coarse()), sync_policy: self.sync_policy.clone(), + async_writer: self.async_writer.clone(), }; ctx.update_ticks_timeout(); let tag = format!("[store {}]", ctx.store.get_id()); @@ -1132,6 +1302,7 @@ where peer_msg_buf: Vec::with_capacity(ctx.cfg.messages_per_tick), previous_metrics: ctx.raft_metrics.clone(), timer: TiInstant::now_coarse(), + loop_timer: TiInstant::now_coarse(), messages_per_tick: ctx.cfg.messages_per_tick, poll_ctx: ctx, cfg_tracker: self.cfg.clone().tracker(tag), @@ -1242,6 +1413,22 @@ impl RaftBatchSystem { cfg.value().delay_sync_enabled(), cfg.value().delay_sync_us as i64, ); + let async_writer_tasks = AsyncWriterTasks::new( + engines.raft.clone(), + (cfg.value().store_io_queue_size as usize) + 1, + cfg.value().store_io_queue_init_bytes as usize, + cfg.value().store_io_queue_bytes_step, + cfg.value().store_io_queue_adaptive_gain as usize, + cfg.value().store_io_queue_sample_quantile, + ); + let async_writer = Arc::new(Mutex::new(AsyncWriter::new( + engines.raft.clone(), + self.router.clone(), + "raftstore-async-writer".to_string(), + cfg.value().store_io_pool_size as usize, + cfg.value().store_io_min_interval_us, + async_writer_tasks, + ))); let mut builder = RaftPollerBuilder { cfg, store: meta, @@ -1264,6 +1451,7 @@ impl RaftBatchSystem { pending_create_peers: Arc::new(Mutex::new(HashMap::default())), applying_snap_count: Arc::new(AtomicUsize::new(0)), sync_policy, + async_writer, }; let region_peers = builder.init()?; let engine = builder.engines.kv.clone(); diff --git a/components/raftstore/src/store/fsm/sync_policy.rs b/components/raftstore/src/store/fsm/sync_policy.rs index 6d43acfc2f7..e0e997fd766 100644 --- a/components/raftstore/src/store/fsm/sync_policy.rs +++ b/components/raftstore/src/store/fsm/sync_policy.rs @@ -1,6 +1,7 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. use std::collections::VecDeque; +use std::mem; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::Arc; @@ -65,10 +66,10 @@ impl Action for SyncAction { #[derive(Default)] pub struct UnsyncedReady { - number: u64, - region_id: u64, - notifier: Arc, - version: u64, + pub number: u64, + pub region_id: u64, + pub notifier: Arc, + pub version: u64, } impl UnsyncedReady { @@ -228,13 +229,17 @@ impl SyncPolicy { notifier: Arc, version: u64, ) { - if !self.delay_sync_enabled { - return; - } + //if !self.delay_sync_enabled { + // return; + //} self.unsynced_readies .push_back(UnsyncedReady::new(number, region_id, notifier, version)); } + pub fn detach_unsynced_readies(&mut self) -> VecDeque { + mem::take(&mut self.unsynced_readies) + } + /// Update the global timestamps(last_sync_ts, last_plan_ts). fn update_ts_after_synced(&mut self, before_sync_ts: i64) { let last_sync_ts = self.global_last_sync_ts.load(Ordering::Acquire); diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 2b0ac6901c3..561a3797012 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -180,6 +180,67 @@ make_auto_flush_static_metric! { } lazy_static! { + pub static ref STORE_LOOP_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_loop_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_LOOP_WORK_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_loop_work_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_KVDB_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_kvdb_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_raftdb_tick_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_RAFTDB_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_raftdb_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_QUEUE_SIZE: Histogram = + register_histogram!( + "tikv_raftstore_store_write_queue_size", + "TODO", + exponential_buckets(1.0, 2.0, 20).unwrap() + ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_ADAPTIVE_IDX: Histogram = + register_histogram!( + "tikv_raftstore_store_adaptive_idx", + "TODO", + exponential_buckets(1.0, 2.0, 20).unwrap() + ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_TASK_BYTES: Histogram = + register_histogram!( + "tikv_raftstore_store_task_bytes", + "TODO", + exponential_buckets(256.0, 2.0, 30).unwrap() + ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_TASK_LIMIT_BYTES: Histogram = + register_histogram!( + "tikv_raftstore_store_task_limit_bytes", + "TODO", + exponential_buckets(256.0, 2.0, 30).unwrap() + ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_TASK_SUGGEST_BYTES: Histogram = + register_histogram!( + "tikv_raftstore_store_task_suggest_bytes", + "TODO", + exponential_buckets(256.0, 2.0, 30).unwrap() + ).unwrap(); + pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( "tikv_raftstore_proposal_total", diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 0a770b0a95f..178bc8d98a2 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1685,17 +1685,19 @@ where self.handle_raft_committed_entries(ctx, ready.take_committed_entries()); } - let invoke_ctx = match self - .mut_store() - .handle_raft_ready(ctx, &mut ready, destroy_regions) - { - Ok(r) => r, - Err(e) => { - // We may have written something to writebatch and it can't be reverted, so has - // to panic here. - panic!("{} failed to handle raft ready: {:?}", self.tag, e) - } - }; + let notifier = self.persisted_notifier.clone(); + let invoke_ctx = + match self + .mut_store() + .handle_raft_ready(ctx, &mut ready, destroy_regions, notifier) + { + Ok(r) => r, + Err(e) => { + // We may have written something to writebatch and it can't be reverted, so has + // to panic here. + panic!("{} failed to handle raft ready: {:?}", self.tag, e) + } + }; Some((ready, invoke_ctx)) } diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 56986b52244..664135da75a 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -25,12 +25,17 @@ use crate::store::ProposalContext; use crate::{Error, Result}; use engine_traits::{RaftEngine, RaftLogBatch}; use into_other::into_other; +use std::sync::Mutex; use tikv_util::worker::Scheduler; use super::metrics::*; use super::worker::RegionTask; use super::{SnapEntry, SnapKey, SnapManager, SnapshotStatistics}; +use crate::store::fsm::sync_policy::UnsyncedReady; +use std::sync::atomic::AtomicU64; +use tikv_util::collections::HashMap; + // When we create a region peer, we should initialize its log term/index > 0, // so that we can force the follower peer to sync the snapshot first. pub const RAFT_INIT_LOG_TERM: u64 = 5; @@ -304,15 +309,287 @@ impl Drop for EntryCache { } } -pub trait HandleRaftReadyContext +pub struct AsyncWriterTask +where + WR: RaftLogBatch, +{ + pub wb: WR, + pub unsynced_readies: HashMap, +} + +impl AsyncWriterTask where - WK: Mutable, WR: RaftLogBatch, +{ + pub fn on_written( + &mut self, + region_id: u64, + ready_number: u64, + region_notifier: Arc, + ) { + self.unsynced_readies.insert( + region_id, + UnsyncedReady { + number: ready_number, + region_id, + notifier: region_notifier, + version: 0, + }, + ); + } + + pub fn is_empty(&self) -> bool { + self.unsynced_readies.is_empty() + } +} + +/* +pub struct AsyncWriterTasks +where + ER: RaftEngine, +{ + wbs: VecDeque>, +} + +impl AsyncWriterTasks +where + ER: RaftEngine, +{ + pub fn new( + engine: ER, + queue_size: usize, + _queue_init_bytes: usize, + _queue_bytes_step: f64, + _queue_sample_size: f64, + ) -> AsyncWriterTasks { + let mut wbs = VecDeque::default(); + for _ in 0..queue_size { + wbs.push_back(AsyncWriterTask { + wb: engine.log_batch(4 * 1024), + unsynced_readies: HashMap::default(), + }); + } + AsyncWriterTasks { + wbs, + } + } + + pub fn prepare_current_for_write(&mut self) -> &mut AsyncWriterTask { + self.wbs.front_mut().unwrap() + } + + pub fn no_task(&self) -> bool { + self.wbs.front().unwrap().unsynced_readies.is_empty() + } + + pub fn detach_task(&mut self) -> AsyncWriterTask { + self.wbs.pop_front().unwrap() + } + + pub fn add(&mut self, task: AsyncWriterTask) { + self.wbs.push_back(task); + } +} +*/ + +pub struct SampleWindow { + count: usize, + buckets: VecDeque, + buckets_val_cnt: VecDeque, + bucket_factor: f64, +} + +impl SampleWindow { + pub fn new() -> SampleWindow { + SampleWindow { + count: 0, + buckets: VecDeque::default(), + buckets_val_cnt: VecDeque::default(), + bucket_factor: 2.0, + } + } + + pub fn observe(&mut self, value: f64) { + // For P99, P999 + self.count += 1; + if self.buckets.is_empty() { + self.buckets.push_back(value); + self.buckets_val_cnt.push_back(0); + } else { + let mut bucket_pos = self.buckets.len() / 2; + loop { + let bucket_val = self.buckets[bucket_pos]; + if value < bucket_val { + if bucket_pos == 0 { + self.buckets.push_front(bucket_val / self.bucket_factor); + self.buckets_val_cnt.push_front(0); + } else { + bucket_pos -= 1; + } + continue; + } + let bucket_val_ub = bucket_val * self.bucket_factor; + if value < bucket_val_ub { + break; + } + if bucket_pos + 1 >= self.buckets.len() { + self.buckets.push_back(bucket_val_ub); + self.buckets_val_cnt.push_back(0); + } + bucket_pos += 1; + } + self.buckets_val_cnt[bucket_pos] += 1; + } + } + + pub fn quantile(&mut self, quantile: f64) -> f64 { + let mut cnt_sum = 0; + let mut index = self.buckets_val_cnt.len() - 1; + let sum_target = (self.count as f64 * quantile) as usize; + for i in 0..self.buckets_val_cnt.len() { + cnt_sum += self.buckets_val_cnt[i]; + if cnt_sum >= sum_target { + index = i; + break; + } + } + self.buckets[index] * self.bucket_factor + } +} + +pub struct AsyncWriterTasks +where + ER: RaftEngine, +{ + wbs: VecDeque>, + size_limits: Vec, + current_idx: usize, + adaptive_idx: usize, + adaptive_gain: usize, + sample_window: SampleWindow, + sample_quantile: f64, +} + +// TODO: make sure the queue size is good when doing pop/front +impl AsyncWriterTasks +where + ER: RaftEngine, +{ + pub fn new( + engine: ER, + queue_size: usize, + queue_init_bytes: usize, + queue_bytes_step: f64, + queue_adaptive_gain: usize, + queue_sample_quantile: f64, + ) -> AsyncWriterTasks { + let mut wbs = VecDeque::default(); + for _ in 0..queue_size { + wbs.push_back(AsyncWriterTask { + wb: engine.log_batch(4 * 1024), + unsynced_readies: HashMap::default(), + }); + } + let mut size_limits = vec![]; + let mut size_limit = queue_init_bytes; + for _ in 0..(queue_size * 2 + queue_adaptive_gain) { + size_limits.push(size_limit); + size_limit = (size_limit as f64 * queue_bytes_step) as usize; + } + AsyncWriterTasks { + wbs, + size_limits, + current_idx: 0, + adaptive_idx: 0, + adaptive_gain: queue_adaptive_gain, + sample_window: SampleWindow::new(), + sample_quantile: queue_sample_quantile, + } + } + + pub fn prepare_current_for_write(&mut self) -> &mut AsyncWriterTask { + assert!(self.current_idx <= self.wbs.len()); + if self.current_idx + 1 < self.wbs.len() { + assert!(self.wbs[self.current_idx + 1].is_empty()); + } + let current_size = self.wbs[self.current_idx].wb.persist_size(); + assert!(self.adaptive_gain + self.adaptive_idx + self.current_idx < self.size_limits.len()); + if current_size >= self.size_limits[self.adaptive_gain + self.adaptive_idx + self.current_idx] { + if self.current_idx + 1 < self.wbs.len() { + self.current_idx += 1; + assert!(self.wbs[self.current_idx].is_empty()); + } else { + // do nothing, adaptive IO size + } + } + &mut self.wbs[self.current_idx] + } + + pub fn no_task(&self) -> bool { + let no_task = self.wbs.front().unwrap().is_empty(); + if no_task { + assert!(self.current_idx == 0); + } + no_task + } + + pub fn detach_task(&mut self) -> AsyncWriterTask { + RAFT_ASYNC_WRITER_QUEUE_SIZE.observe(self.current_idx as f64); + RAFT_ASYNC_WRITER_ADAPTIVE_IDX.observe(self.adaptive_idx as f64); + + assert!(self.current_idx <= self.wbs.len()); + if self.current_idx + 1 < self.wbs.len() { + assert!(self.wbs[self.current_idx + 1].is_empty()); + } + if self.current_idx != 0 { + assert!(!self.wbs[self.current_idx - 1].is_empty()); + } + let task = self.wbs.pop_front().unwrap(); + assert!(!task.is_empty()); + + let task_bytes = task.wb.persist_size(); + RAFT_ASYNC_WRITER_TASK_BYTES.observe(task_bytes as f64); + + assert!(self.adaptive_gain + self.adaptive_idx + self.current_idx < self.size_limits.len()); + RAFT_ASYNC_WRITER_TASK_LIMIT_BYTES.observe( + self.size_limits[self.adaptive_gain + self.adaptive_idx + self.current_idx] as f64); + + self.sample_window.observe(task_bytes as f64); + let task_suggest_bytes = self.sample_window.quantile(self.sample_quantile); + RAFT_ASYNC_WRITER_TASK_SUGGEST_BYTES.observe(task_suggest_bytes); + + let current_target_bytes = self.size_limits[self.adaptive_idx + self.current_idx] as f64; + if task_suggest_bytes >= current_target_bytes { + if self.adaptive_idx + (self.wbs.len() - 1) + 1 < self.size_limits.len() { + self.adaptive_idx += 1; + } + } else if self.adaptive_idx > 0 && + task_suggest_bytes < (self.size_limits[self.adaptive_idx + self.current_idx - 1] as f64) { + self.adaptive_idx -= 1; + } + + if self.current_idx != 0 { + self.current_idx -= 1; + } + task + } + + pub fn add(&mut self, task: AsyncWriterTask) { + assert!(task.is_empty()); + self.wbs.push_back(task); + } +} + +pub trait HandleRaftReadyContext +where + WK: Mutable, + ER: RaftEngine, { /// Returns the mutable references of WriteBatch for both KvDB and RaftDB in one interface. - fn wb_mut(&mut self) -> (&mut WK, &mut WR); + fn wb_mut(&mut self) -> (&mut WK, Arc>>); fn kv_wb_mut(&mut self) -> &mut WK; - fn raft_wb_mut(&mut self) -> &mut WR; + //fn raft_wb_mut(&mut self) -> &mut WR; + fn raft_wb_pool(&mut self) -> Arc>>; fn sync_log(&self) -> bool; fn set_sync_log(&mut self, sync: bool); } @@ -1011,11 +1288,11 @@ where // to the return one. // WARNING: If this function returns error, the caller must panic otherwise the entry cache may // be wrong and break correctness. - pub fn append>( + pub fn append( &mut self, invoke_ctx: &mut InvokeContext, entries: Vec, - ready_ctx: &mut H, + raft_wb: &mut ER::LogBatch, ) -> Result { let region_id = self.get_region_id(); debug!( @@ -1043,13 +1320,10 @@ where cache.append(&self.tag, &entries); } - ready_ctx.raft_wb_mut().append(region_id, entries)?; - + raft_wb.append(region_id, entries)?; // Delete any previously appended log entries which never committed. // TODO: Wrap it as an engine::Error. - ready_ctx - .raft_wb_mut() - .cut_logs(region_id, last_index + 1, prev_last_index); + raft_wb.cut_logs(region_id, last_index + 1, prev_last_index); invoke_ctx.raft_state.set_last_index(last_index); invoke_ctx.last_term = last_term; @@ -1360,52 +1634,60 @@ where /// it explicitly to disk. If it's flushed to disk successfully, `post_ready` should be called /// to update the memory states properly. /// WARNING: If this function returns error, the caller must panic(details in `append` function). - pub fn handle_raft_ready>( + pub fn handle_raft_ready>( &mut self, ready_ctx: &mut H, ready: &mut Ready, destroy_regions: Vec, + region_notifier: Arc, ) -> Result { + let region_id = self.get_region_id(); let mut ctx = InvokeContext::new(self); - let snapshot_index = if ready.snapshot().is_empty() { - 0 - } else { - fail_point!("raft_before_apply_snap"); - let (kv_wb, raft_wb) = ready_ctx.wb_mut(); - self.apply_snapshot(&mut ctx, ready.snapshot(), kv_wb, raft_wb, &destroy_regions)?; - fail_point!("raft_after_apply_snap"); + let mut snapshot_index = 0; - ctx.destroyed_regions = destroy_regions; - - last_index(&ctx.raft_state) - }; + { + let raft_wb_pool = ready_ctx.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = raft_wbs.prepare_current_for_write(); + let current_size = current.wb.persist_size(); + + if !ready.snapshot().is_empty() { + fail_point!("raft_before_apply_snap"); + self.apply_snapshot(&mut ctx, ready.snapshot(), ready_ctx.kv_wb_mut(), &mut current.wb, &destroy_regions)?; + fail_point!("raft_after_apply_snap"); + ctx.destroyed_regions = destroy_regions; + snapshot_index = last_index(&ctx.raft_state); + }; - if !ready.entries().is_empty() { - self.append(&mut ctx, ready.take_entries(), ready_ctx)?; - } + if !ready.entries().is_empty() { + self.append(&mut ctx, ready.take_entries(), &mut current.wb)?; + } + // Last index is 0 means the peer is created from raft message + // and has not applied snapshot yet, so skip persistent hard state. + if ctx.raft_state.get_last_index() > 0 { + if let Some(hs) = ready.hs() { + ctx.raft_state.set_hard_state(hs.clone()); + } + } - // Last index is 0 means the peer is created from raft message - // and has not applied snapshot yet, so skip persistent hard state. - if ctx.raft_state.get_last_index() > 0 { - if let Some(hs) = ready.hs() { - ctx.raft_state.set_hard_state(hs.clone()); + // Save raft state if it has changed or there is a snapshot. + if ctx.raft_state != self.raft_state || snapshot_index > 0 { + ctx.save_raft_state_to(&mut current.wb)?; } - } - // Save raft state if it has changed or there is a snapshot. - if ctx.raft_state != self.raft_state || snapshot_index > 0 { - ctx.save_raft_state_to(ready_ctx.raft_wb_mut())?; - if snapshot_index > 0 { - // in case of restart happen when we just write region state to Applying, - // but not write raft_local_state to raft rocksdb in time. - // we write raft state to default rocksdb, with last index set to snap index, - // in case of recv raft log after snapshot. - ctx.save_snapshot_raft_state_to(snapshot_index, ready_ctx.kv_wb_mut())?; + if ready.must_sync() { + assert!(current_size != current.wb.persist_size()); + current.on_written(region_id, ready.number(), region_notifier.clone()); } } // only when apply snapshot if snapshot_index > 0 { + // in case of restart happen when we just write region state to Applying, + // but not write raft_local_state to raft rocksdb in time. + // we write raft state to default rocksdb, with last index set to snap index, + // in case of recv raft log after snapshot. + ctx.save_snapshot_raft_state_to(snapshot_index, ready_ctx.kv_wb_mut())?; ctx.save_apply_state_to(ready_ctx.kv_wb_mut())?; } @@ -1642,7 +1924,7 @@ pub fn write_peer_state( kv_wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), ®ion_state)?; Ok(()) } - +/* #[cfg(test)] mod tests { use crate::coprocessor::CoprocessorHost; @@ -1705,14 +1987,14 @@ mod tests { } impl HandleRaftReadyContext for ReadyContext { - fn wb_mut(&mut self) -> (&mut KvTestWriteBatch, &mut RaftTestWriteBatch) { - (&mut self.kv_wb, &mut self.raft_wb) - } + //fn wb_mut(&mut self) -> (&mut KvTestWriteBatch, &mut RaftTestWriteBatch) { + // (&mut self.kv_wb, &mut self.raft_wb) + //} fn kv_wb_mut(&mut self) -> &mut KvTestWriteBatch { &mut self.kv_wb } - fn raft_wb_mut(&mut self) -> &mut RaftTestWriteBatch { - &mut self.raft_wb + fn raft_wb_pool(&mut self) -> &mut RaftTestWriteBatch { + &mut self.raft_wbs } fn sync_log(&self) -> bool { self.sync_log @@ -2621,3 +2903,5 @@ mod tests { assert!(build_storage().is_err()); } } + +*/