From 7c310c281aaa528cd675625e12d6e00308e10809 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Thu, 24 Dec 2020 15:41:25 +0800 Subject: [PATCH 01/15] demo (not work) Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 184 ++++++++++++++---- .../raftstore/src/store/fsm/sync_policy.rs | 19 +- 2 files changed, 162 insertions(+), 41 deletions(-) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 6bfb905d75e..1b86cc1894b 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -49,7 +49,7 @@ use crate::store::fsm::metrics::*; use crate::store::fsm::peer::{ maybe_destroy_source, new_admin_request, PeerFsm, PeerFsmDelegate, SenderFsmPair, }; -use crate::store::fsm::sync_policy::{new_sync_policy, SyncAction, SyncPolicy}; +use crate::store::fsm::sync_policy::{new_sync_policy, SyncAction, SyncPolicy, UnsyncedReady}; use crate::store::fsm::ApplyNotifier; use crate::store::fsm::ApplyTaskRes; use crate::store::fsm::{ @@ -82,6 +82,130 @@ const RAFT_WB_SHRINK_SIZE: usize = 1024 * 1024; pub const PENDING_MSG_CAP: usize = 100; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); +use std::collections::VecDeque; +use std::thread::JoinHandle; + +pub struct AsyncDBWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + engine: ER, + router: RaftRouter, + tag: String, + wbs: Arc>>, + pub tx: LooseBoundedSender<(ER::LogBatch, VecDeque)>, + rx: Arc)>>, + workers: Arc>>>, +} + +impl Clone for AsyncDBWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + fn clone(&self) -> Self { + AsyncDBWriter{ + engine: self.engine.clone(), + router: self.router.clone(), + tag: self.tag.clone(), + wbs: self.wbs.clone(), + tx: self.tx.clone(), + rx: self.rx.clone(), + workers: self.workers.clone(), + } + } +} + +impl AsyncDBWriter +where + EK: KvEngine, + ER: RaftEngine, +{ + pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize) -> AsyncDBWriter { + let (tx, rx) = mpsc::loose_bounded(pool_size * 2); + let mut async_writer = AsyncDBWriter{ + engine, + router, + tag, + wbs: Arc::new(Mutex::new(VecDeque::default())), + tx, + rx: Arc::new(rx), + workers: Arc::new(Mutex::new(vec![])), + }; + async_writer.spawn(pool_size); + async_writer + } + + fn spawn(&mut self, pool_size: usize) { + // TODO: support more than 1 write-thread + assert!(pool_size == 1); + for i in 0..pool_size { + let mut x = self.clone(); + let t = thread::Builder::new() + .name(thd_name!(format!("raftdb-async-writer-{}", i))) + .spawn(move || { + let (wb, unsynced_readies) = x.rx.recv().unwrap(); + x.sync_write(wb, unsynced_readies) + }) + .unwrap(); + // TODO: graceful exit + self.workers.lock().unwrap().push(t); + } + } + + pub fn new_wb(&mut self) -> ER::LogBatch { + let mut wbs = self.wbs.lock().unwrap(); + if wbs.is_empty() { + self.engine.log_batch(4 * 1024) + } else { + wbs.pop_front().unwrap() + } + } + + pub fn async_write(&mut self, wb: ER::LogBatch, unsynced_readies: VecDeque) { + // TODO: block if full + self.tx.force_send((wb, unsynced_readies)) + .unwrap_or_else(|e| { + panic!("{} failed to send task via channel: {:?}", self.tag, e); + }); + } + + pub fn sync_write(&mut self, mut wb: ER::LogBatch, unsynced_readies: VecDeque) { + self.engine + .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) + .unwrap_or_else(|e| { + panic!("{} failed to save raft append result: {:?}", self.tag, e); + }); + self.flush_unsynced_readies(unsynced_readies); + let mut wbs = self.wbs.lock().unwrap(); + wbs.push_back(wb); + } + + fn flush_unsynced_readies(&mut self, mut unsynced_readies: VecDeque) { + for r in unsynced_readies.drain(..) { + loop { + let pre_number = r.notifier.load(Ordering::Acquire); + assert_ne!(pre_number, r.number); + if pre_number > r.number { + break; + } + if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) { + if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { + error!( + "failed to send noop to trigger persisted ready"; + "region_id" => r.region_id, + "ready_number" => r.number, + "error" => ?e, + ); + } + break; + } + } + } + } +} + pub struct StoreInfo { pub engine: E, pub capacity: u64, @@ -339,6 +463,7 @@ where pub tick_batch: Vec, pub node_start_time: Option, pub sync_policy: SyncPolicy>, + pub async_writer: Arc>>, } impl HandleRaftReadyContext for PollContext @@ -376,6 +501,14 @@ where EK: KvEngine, ER: RaftEngine, { + #[inline] + pub fn detach_raft_wb(&mut self) -> ER::LogBatch { + let mut async_writer = self.async_writer.lock().unwrap(); + let mut raft_wb = async_writer.new_wb(); + mem::swap(&mut self.raft_wb, &mut raft_wb); + raft_wb + } + #[inline] pub fn store_id(&self) -> u64 { self.store.get_id() @@ -664,31 +797,8 @@ impl RaftPoller { self.poll_ctx.store_id() == 1, |_| {} ); - self.poll_ctx - .engines - .raft - .consume_and_shrink( - &mut self.poll_ctx.raft_wb, - false, - RAFT_WB_SHRINK_SIZE, - 4 * 1024, - ) - .unwrap_or_else(|e| { - panic!("{} failed to save raft append result: {:?}", self.tag, e); - }); } - let synced = if self.poll_ctx.sync_policy.delay_sync_enabled() { - self.poll_ctx.sync_policy.sync_if_needed(true) - } else { - if !raft_wb_is_empty { - self.poll_ctx.engines.raft.sync().unwrap_or_else(|e| { - panic!("{} failed to sync raft engine: {:?}", self.tag, e); - }); - } - true - }; - report_perf_context!( self.poll_ctx.perf_context_statistics, STORE_PERF_CONTEXT_TIME_HISTOGRAM_STATIC @@ -696,11 +806,7 @@ impl RaftPoller { fail_point!("raft_after_save"); if ready_cnt != 0 { - let unsynced_version = if synced { - None - } else { - Some(self.poll_ctx.sync_policy.new_unsynced_version()) - }; + let unsynced_version = Some(self.poll_ctx.sync_policy.new_unsynced_version()); let mut batch_pos = 0; let mut ready_res = mem::take(&mut self.poll_ctx.ready_res); for (ready, invoke_ctx) in ready_res.drain(..) { @@ -715,6 +821,14 @@ impl RaftPoller { .post_raft_ready_append(ready, invoke_ctx, unsynced_version); } } + + if !raft_wb_is_empty { + let raft_wb = self.poll_ctx.detach_raft_wb(); + let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); + let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); + async_writer.async_write(raft_wb, unsynced_readies); + } + let dur = self.timer.elapsed(); if !self.poll_ctx.store_stat.is_busy { let election_timeout = Duration::from_millis( @@ -797,7 +911,6 @@ impl PollHandler, St self.poll_ctx.cfg = incoming.clone(); self.poll_ctx.update_ticks_timeout(); } - self.poll_ctx.sync_policy.try_flush_readies(); } fn handle_control(&mut self, store: &mut StoreFsm) -> Option { @@ -866,7 +979,6 @@ impl PollHandler, St fn end(&mut self, peers: &mut [Box>]) { self.flush_ticks(); - self.poll_ctx.sync_policy.try_flush_readies(); if self.poll_ctx.has_ready { self.handle_raft_ready(peers); } @@ -876,18 +988,17 @@ impl PollHandler, St .process_ready .observe(duration_to_sec(self.timer.elapsed()) as f64); self.poll_ctx.raft_metrics.flush(); - self.poll_ctx.sync_policy.metrics.flush(); self.poll_ctx.store_stat.flush(); } fn pause(&mut self) -> bool { - let all_synced_and_flushed = self.poll_ctx.sync_policy.try_sync_and_flush(); if self.poll_ctx.trans.need_flush() { self.poll_ctx.trans.flush(); } // If there are cached data and go into pause status, that will cause high latency or hunger // so it should return false(means pause failed) when there are still jobs to do - all_synced_and_flushed + //all_synced_and_flushed + true } } @@ -913,6 +1024,7 @@ pub struct RaftPollerBuilder { applying_snap_count: Arc, global_replication_state: Arc>, pub sync_policy: SyncPolicy>, + pub async_writer: Arc>>, } impl RaftPollerBuilder { @@ -1123,6 +1235,7 @@ where tick_batch: vec![PeerTickBatch::default(); 256], node_start_time: Some(TiInstant::now_coarse()), sync_policy: self.sync_policy.clone(), + async_writer: self.async_writer.clone(), }; ctx.update_ticks_timeout(); let tag = format!("[store {}]", ctx.store.get_id()); @@ -1242,6 +1355,8 @@ impl RaftBatchSystem { cfg.value().delay_sync_enabled(), cfg.value().delay_sync_us as i64, ); + let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), + self.router.clone(), "raftstore-async-writer".to_string(), 1))); let mut builder = RaftPollerBuilder { cfg, store: meta, @@ -1264,6 +1379,7 @@ impl RaftBatchSystem { pending_create_peers: Arc::new(Mutex::new(HashMap::default())), applying_snap_count: Arc::new(AtomicUsize::new(0)), sync_policy, + async_writer, }; let region_peers = builder.init()?; let engine = builder.engines.kv.clone(); diff --git a/components/raftstore/src/store/fsm/sync_policy.rs b/components/raftstore/src/store/fsm/sync_policy.rs index 6d43acfc2f7..40154df1fea 100644 --- a/components/raftstore/src/store/fsm/sync_policy.rs +++ b/components/raftstore/src/store/fsm/sync_policy.rs @@ -4,6 +4,7 @@ use std::collections::VecDeque; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::Arc; +use std::mem; use crossbeam::utils::CachePadded; use engine_traits::KvEngine; @@ -65,10 +66,10 @@ impl Action for SyncAction { #[derive(Default)] pub struct UnsyncedReady { - number: u64, - region_id: u64, - notifier: Arc, - version: u64, + pub number: u64, + pub region_id: u64, + pub notifier: Arc, + pub version: u64, } impl UnsyncedReady { @@ -228,13 +229,17 @@ impl SyncPolicy { notifier: Arc, version: u64, ) { - if !self.delay_sync_enabled { - return; - } + //if !self.delay_sync_enabled { + // return; + //} self.unsynced_readies .push_back(UnsyncedReady::new(number, region_id, notifier, version)); } + pub fn detach_unsynced_readies(&mut self) -> VecDeque { + mem::take(&mut self.unsynced_readies) + } + /// Update the global timestamps(last_sync_ts, last_plan_ts). fn update_ts_after_synced(&mut self, before_sync_ts: i64) { let last_sync_ts = self.global_last_sync_ts.load(Ordering::Acquire); From 1f62d2555c431f4d8e1a0bc1a639bbdfaeb834ae Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Thu, 24 Dec 2020 16:30:43 +0800 Subject: [PATCH 02/15] Splitted store CPU-IO workload so it could use diff conc, but task size still related Signed-off-by: Liu Cong --- components/raftstore/src/store/config.rs | 6 ++++++ components/raftstore/src/store/fsm/store.rs | 14 +++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index e0abd7d6de0..c7a19e5e652 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -30,6 +30,8 @@ pub struct Config { // delay time of raft db sync (us). #[config(skip)] pub delay_sync_us: u64, + #[config(skip)] + pub store_io_pool_size: u64, // minimizes disruption when a partitioned node rejoins the cluster by using a two phase election. #[config(skip)] pub prevote: bool, @@ -196,6 +198,7 @@ impl Default for Config { let split_size = ReadableSize::mb(coprocessor::config::SPLIT_SIZE_MB); Config { delay_sync_us: 0, + store_io_pool_size: 2, prevote: true, raftdb_path: String::new(), capacity: ReadableSize(0), @@ -421,6 +424,9 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["delay_sync_us"]) .set((self.delay_sync_us as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_pool_size"]) + .set((self.store_io_pool_size as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["prevote"]) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 1b86cc1894b..02cbfaa4ee8 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -95,7 +95,7 @@ where tag: String, wbs: Arc>>, pub tx: LooseBoundedSender<(ER::LogBatch, VecDeque)>, - rx: Arc)>>, + rx: Arc)>>>, workers: Arc>>>, } @@ -130,7 +130,7 @@ where tag, wbs: Arc::new(Mutex::new(VecDeque::default())), tx, - rx: Arc::new(rx), + rx: Arc::new(Mutex::new(rx)), workers: Arc::new(Mutex::new(vec![])), }; async_writer.spawn(pool_size); @@ -138,15 +138,15 @@ where } fn spawn(&mut self, pool_size: usize) { - // TODO: support more than 1 write-thread - assert!(pool_size == 1); for i in 0..pool_size { let mut x = self.clone(); let t = thread::Builder::new() .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { - let (wb, unsynced_readies) = x.rx.recv().unwrap(); - x.sync_write(wb, unsynced_readies) + loop { + let (wb, unsynced_readies) = x.rx.lock().unwrap().recv().unwrap(); + x.sync_write(wb, unsynced_readies); + } }) .unwrap(); // TODO: graceful exit @@ -1356,7 +1356,7 @@ impl RaftBatchSystem { cfg.value().delay_sync_us as i64, ); let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), - self.router.clone(), "raftstore-async-writer".to_string(), 1))); + self.router.clone(), "raftstore-async-writer".to_string(), cfg.value().store_io_pool_size as usize))); let mut builder = RaftPollerBuilder { cfg, store: meta, From f0a597ba43f94ad4215a0b1da7ed32e027f65306 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Fri, 25 Dec 2020 15:57:54 +0800 Subject: [PATCH 03/15] Splitted store CPU-IO workload so it could use diff conc, but task size still related Signed-off-by: Liu Cong --- Cargo.lock | 123 +++++++++++++++----- components/raftstore/Cargo.toml | 1 + components/raftstore/src/store/fsm/store.rs | 24 ++-- 3 files changed, 105 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 32eb0fb10a1..4573bfca278 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,7 +215,7 @@ dependencies = [ "prometheus", "raft", "raftstore", - "rand", + "rand 0.7.3", "security", "serde", "serde_derive", @@ -497,6 +497,15 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "chan" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d14956a3dae065ffaa0d92ece848ab4ced88d32361e7fdfbfd653a5c454a1ed8" +dependencies = [ + "rand 0.3.23", +] + [[package]] name = "chrono" version = "0.4.11" @@ -604,7 +613,7 @@ dependencies = [ "raft", "raft_log_engine", "raftstore", - "rand", + "rand 0.7.3", "security", "serde_json", "signal", @@ -633,7 +642,7 @@ dependencies = [ "libc", "panic_hook", "protobuf", - "rand", + "rand 0.7.3", "static_assertions", "tikv_alloc", ] @@ -647,7 +656,7 @@ dependencies = [ "fail", "futures 0.3.7", "parking_lot 0.11.0", - "rand", + "rand 0.7.3", "tikv_alloc", "tokio", "txn_types", @@ -689,7 +698,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c750ec12b83377637110d5a57f5ae08e895b06c4b16e2bdbf1a94ef717428c59" dependencies = [ "proc-macro-hack", - "rand", + "rand 0.7.3", ] [[package]] @@ -755,7 +764,7 @@ dependencies = [ "itertools 0.8.2", "lazy_static", "num-traits", - "rand_core", + "rand_core 0.5.1", "rand_os", "rand_xoshiro", "rayon", @@ -1088,7 +1097,7 @@ dependencies = [ "openssl", "prometheus", "protobuf", - "rand", + "rand 0.7.3", "rusoto_core", "rusoto_credential", "rusoto_kms", @@ -1137,7 +1146,7 @@ dependencies = [ "prometheus-static-metric", "protobuf", "raft", - "rand", + "rand 0.7.3", "rocksdb", "serde", "serde_derive", @@ -1232,7 +1241,7 @@ dependencies = [ "lazy_static", "matches", "prometheus", - "rand", + "rand 0.7.3", "rusoto_core", "rusoto_mock", "rusoto_s3", @@ -1257,7 +1266,7 @@ checksum = "3be3c61c59fdc91f5dbc3ea31ee8623122ce80057058be560654c5d410d181a6" dependencies = [ "lazy_static", "log", - "rand", + "rand 0.7.3", ] [[package]] @@ -1296,7 +1305,7 @@ dependencies = [ "fs2", "lazy_static", "openssl", - "rand", + "rand 0.7.3", "tempfile", "tikv_alloc", ] @@ -1344,6 +1353,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f2a4a2034423744d2cc7ca2068453168dcdb82c438419e639a26bd87839c674" +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "fuchsia-zircon" version = "0.3.3" @@ -3092,7 +3107,7 @@ dependencies = [ "protobuf", "quick-error", "raft-proto", - "rand", + "rand 0.7.3", "slog", ] @@ -3152,6 +3167,7 @@ dependencies = [ "batch-system", "bitflags", "byteorder", + "chan", "concurrency_manager", "configuration", "crc32fast", @@ -3184,7 +3200,7 @@ dependencies = [ "quick-error", "raft", "raft-proto", - "rand", + "rand 0.7.3", "serde", "serde_derive", "serde_with", @@ -3204,6 +3220,29 @@ dependencies = [ "yatp", ] +[[package]] +name = "rand" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" +dependencies = [ + "libc", + "rand 0.4.6", +] + +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi 0.3.8", +] + [[package]] name = "rand" version = "0.7.3" @@ -3213,7 +3252,7 @@ dependencies = [ "getrandom", "libc", "rand_chacha", - "rand_core", + "rand_core 0.5.1", "rand_hc", ] @@ -3224,9 +3263,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a2a90da8c7523f554344f921aa97283eadf6ac484a6d2a7d0212fa7f8d6853" dependencies = [ "c2-chacha", - "rand_core", + "rand_core 0.5.1", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.5.1" @@ -3242,7 +3296,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3251,7 +3305,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df6b0b3dc9991a10b2d91a86d1129314502169a1bf6afa67328945e02498b76" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3261,7 +3315,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a788ae3edb696cfcba1c19bfd388cc4b8c21f8a408432b199c072825084da58a" dependencies = [ "getrandom", - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3270,7 +3324,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77d416b86801d23dde1aa643023b775c3a462efc0ed96443add11546cdf1dca8" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3279,7 +3333,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e18c91676f670f6f0312764c759405f13afb98d5d73819840cf72a518487bff" dependencies = [ - "rand_core", + "rand_core 0.5.1", ] [[package]] @@ -3317,6 +3371,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.1.56" @@ -4245,7 +4308,7 @@ checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" dependencies = [ "cfg-if", "libc", - "rand", + "rand 0.7.3", "redox_syscall", "remove_dir_all", "winapi 0.3.8", @@ -4333,7 +4396,7 @@ dependencies = [ "pd_client", "raft", "raftstore", - "rand", + "rand 0.7.3", "security", "slog", "slog-global", @@ -4380,7 +4443,7 @@ dependencies = [ "fail", "grpcio", "kvproto", - "rand", + "rand 0.7.3", "rand_isaac", "security", "slog", @@ -4423,7 +4486,7 @@ dependencies = [ "protobuf", "raft", "raftstore", - "rand", + "rand 0.7.3", "rand_xorshift", "security", "semver 0.10.0", @@ -4614,7 +4677,7 @@ dependencies = [ "panic_hook", "profiler", "protobuf", - "rand", + "rand 0.7.3", "safemem", "static_assertions", "tidb_query_codegen", @@ -4697,7 +4760,7 @@ dependencies = [ "raft", "raft_log_engine", "raftstore", - "rand", + "rand 0.7.3", "regex", "rev_lines", "security", @@ -4817,7 +4880,7 @@ dependencies = [ "prometheus", "protobuf", "quick-error", - "rand", + "rand 0.7.3", "regex", "serde", "serde_json", @@ -5159,7 +5222,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" dependencies = [ - "rand", + "rand 0.7.3", "serde", ] @@ -5415,7 +5478,7 @@ dependencies = [ "num_cpus", "parking_lot_core 0.8.0", "prometheus", - "rand", + "rand 0.7.3", ] [[package]] @@ -5430,7 +5493,7 @@ version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e12b8667a4fff63d236f8363be54392f93dbb13616be64a83e761a9319ab589" dependencies = [ - "rand", + "rand 0.7.3", ] [[package]] diff --git a/components/raftstore/Cargo.toml b/components/raftstore/Cargo.toml index abd9ed62b6d..052729a03f4 100644 --- a/components/raftstore/Cargo.toml +++ b/components/raftstore/Cargo.toml @@ -83,6 +83,7 @@ tokio = { version = "0.2", features = ["sync", "rt-threaded"] } txn_types = { path = "../txn_types" } uuid = { version = "0.8.1", features = ["serde", "v4"] } yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" } +chan = "0.1" [dev-dependencies] engine_test = { path = "../engine_test" } diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 02cbfaa4ee8..f91e14acd0c 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -84,6 +84,7 @@ const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); use std::collections::VecDeque; use std::thread::JoinHandle; +use chan; pub struct AsyncDBWriter where @@ -94,8 +95,8 @@ where router: RaftRouter, tag: String, wbs: Arc>>, - pub tx: LooseBoundedSender<(ER::LogBatch, VecDeque)>, - rx: Arc)>>>, + pub tx: chan::Sender<(ER::LogBatch, VecDeque)>, + rx: Arc)>>, workers: Arc>>>, } @@ -122,15 +123,15 @@ where EK: KvEngine, ER: RaftEngine, { - pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize) -> AsyncDBWriter { - let (tx, rx) = mpsc::loose_bounded(pool_size * 2); + pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize, buff_size: usize) -> AsyncDBWriter { + let (tx, rx) = chan::sync(buff_size); let mut async_writer = AsyncDBWriter{ engine, router, tag, wbs: Arc::new(Mutex::new(VecDeque::default())), tx, - rx: Arc::new(Mutex::new(rx)), + rx: Arc::new(rx), workers: Arc::new(Mutex::new(vec![])), }; async_writer.spawn(pool_size); @@ -144,7 +145,7 @@ where .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { loop { - let (wb, unsynced_readies) = x.rx.lock().unwrap().recv().unwrap(); + let (wb, unsynced_readies) = x.rx.recv().unwrap(); x.sync_write(wb, unsynced_readies); } }) @@ -164,11 +165,7 @@ where } pub fn async_write(&mut self, wb: ER::LogBatch, unsynced_readies: VecDeque) { - // TODO: block if full - self.tx.force_send((wb, unsynced_readies)) - .unwrap_or_else(|e| { - panic!("{} failed to send task via channel: {:?}", self.tag, e); - }); + self.tx.send((wb, unsynced_readies)); } pub fn sync_write(&mut self, mut wb: ER::LogBatch, unsynced_readies: VecDeque) { @@ -826,7 +823,7 @@ impl RaftPoller { let raft_wb = self.poll_ctx.detach_raft_wb(); let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); - async_writer.async_write(raft_wb, unsynced_readies); + async_writer.sync_write(raft_wb, unsynced_readies); } let dur = self.timer.elapsed(); @@ -1356,7 +1353,8 @@ impl RaftBatchSystem { cfg.value().delay_sync_us as i64, ); let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), - self.router.clone(), "raftstore-async-writer".to_string(), cfg.value().store_io_pool_size as usize))); + self.router.clone(), "raftstore-async-writer".to_string(), + cfg.value().store_io_pool_size as usize, 16))); let mut builder = RaftPollerBuilder { cfg, store: meta, From c72af06e7466a899ae03aeb2ef357dcf927035c1 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Sun, 27 Dec 2020 19:03:13 +0800 Subject: [PATCH 04/15] Fixed issue: can not transfer leader Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index f91e14acd0c..4f447ec454f 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -803,7 +803,11 @@ impl RaftPoller { fail_point!("raft_after_save"); if ready_cnt != 0 { - let unsynced_version = Some(self.poll_ctx.sync_policy.new_unsynced_version()); + let unsynced_version = if !raft_wb_is_empty { + Some(self.poll_ctx.sync_policy.new_unsynced_version()) + } else { + None + }; let mut batch_pos = 0; let mut ready_res = mem::take(&mut self.poll_ctx.ready_res); for (ready, invoke_ctx) in ready_res.drain(..) { @@ -823,7 +827,7 @@ impl RaftPoller { let raft_wb = self.poll_ctx.detach_raft_wb(); let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); - async_writer.sync_write(raft_wb, unsynced_readies); + async_writer.async_write(raft_wb, unsynced_readies); } let dur = self.timer.elapsed(); From d6ff6e4e16616956afaa99d325445153814352d6 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Sun, 27 Dec 2020 20:54:43 +0800 Subject: [PATCH 05/15] Fixed issue: can not transfer leader Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 4f447ec454f..c8fc211a0df 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -803,11 +803,7 @@ impl RaftPoller { fail_point!("raft_after_save"); if ready_cnt != 0 { - let unsynced_version = if !raft_wb_is_empty { - Some(self.poll_ctx.sync_policy.new_unsynced_version()) - } else { - None - }; + let unsynced_version = Some(self.poll_ctx.sync_policy.new_unsynced_version()); let mut batch_pos = 0; let mut ready_res = mem::take(&mut self.poll_ctx.ready_res); for (ready, invoke_ctx) in ready_res.drain(..) { @@ -823,11 +819,14 @@ impl RaftPoller { } } + let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); if !raft_wb_is_empty { let raft_wb = self.poll_ctx.detach_raft_wb(); - let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); async_writer.async_write(raft_wb, unsynced_readies); + } else { + let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); + async_writer.flush_unsynced_readies(unsynced_readies); } let dur = self.timer.elapsed(); From 373913b8a44025493f39f31f68a284b41ced1df4 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Mon, 28 Dec 2020 17:43:49 +0800 Subject: [PATCH 06/15] Add metrics Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 22 ++++++++++++ components/raftstore/src/store/metrics.rs | 37 +++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index c8fc211a0df..54d0a5d5050 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -84,6 +84,7 @@ const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); use std::collections::VecDeque; use std::thread::JoinHandle; +use prometheus::IntGauge; use chan; pub struct AsyncDBWriter @@ -98,6 +99,7 @@ where pub tx: chan::Sender<(ER::LogBatch, VecDeque)>, rx: Arc)>>, workers: Arc>>>, + metrics_queue_size: IntGauge, } impl Clone for AsyncDBWriter @@ -114,6 +116,7 @@ where tx: self.tx.clone(), rx: self.rx.clone(), workers: self.workers.clone(), + metrics_queue_size: self.metrics_queue_size.clone(), } } } @@ -133,6 +136,8 @@ where tx, rx: Arc::new(rx), workers: Arc::new(Mutex::new(vec![])), + metrics_queue_size: ASYNC_WRITER_IO_QUEUE_VEC + .with_label_values(&["raft-log"]), }; async_writer.spawn(pool_size); async_writer @@ -145,8 +150,11 @@ where .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { loop { + let now = TiInstant::now_coarse(); let (wb, unsynced_readies) = x.rx.recv().unwrap(); x.sync_write(wb, unsynced_readies); + STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM + .observe(duration_to_sec(now.elapsed()) as f64); } }) .unwrap(); @@ -165,15 +173,19 @@ where } pub fn async_write(&mut self, wb: ER::LogBatch, unsynced_readies: VecDeque) { + self.metrics_queue_size.inc(); self.tx.send((wb, unsynced_readies)); } pub fn sync_write(&mut self, mut wb: ER::LogBatch, unsynced_readies: VecDeque) { + let now = TiInstant::now_coarse(); self.engine .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) .unwrap_or_else(|e| { panic!("{} failed to save raft append result: {:?}", self.tag, e); }); + STORE_WRITE_RAFTDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64); + self.metrics_queue_size.dec(); self.flush_unsynced_readies(unsynced_readies); let mut wbs = self.wbs.lock().unwrap(); wbs.push_back(wb); @@ -750,6 +762,7 @@ pub struct RaftPoller>, previous_metrics: RaftMetrics, timer: TiInstant, + loop_timer: TiInstant, poll_ctx: PollContext, messages_per_tick: usize, cfg_tracker: Tracker, @@ -768,6 +781,7 @@ impl RaftPoller { let ready_cnt = self.poll_ctx.ready_res.len(); self.poll_ctx.raft_metrics.ready.has_ready_region += ready_cnt as u64; fail_point!("raft_before_save"); + let now = TiInstant::now_coarse(); if !self.poll_ctx.kv_wb.is_empty() { let mut write_opts = WriteOptions::new(); write_opts.set_sync(true); @@ -786,6 +800,8 @@ impl RaftPoller { } } fail_point!("raft_between_save"); + STORE_WRITE_KVDB_DURATION_HISTOGRAM + .observe(duration_to_sec(now.elapsed()) as f64); let raft_wb_is_empty = self.poll_ctx.raft_wb.is_empty(); if !raft_wb_is_empty { @@ -889,6 +905,9 @@ impl PollHandler, St self.poll_ctx.sync_log = false; self.poll_ctx.has_ready = false; self.timer = TiInstant::now_coarse(); + STORE_LOOP_DURATION_HISTOGRAM + .observe(duration_to_sec(self.loop_timer.elapsed()) as f64); + self.loop_timer = TiInstant::now_coarse(); // update config self.poll_ctx.perf_context_statistics.start(); if let Some(incoming) = self.cfg_tracker.any_new() { @@ -987,6 +1006,8 @@ impl PollHandler, St .raft_metrics .process_ready .observe(duration_to_sec(self.timer.elapsed()) as f64); + STORE_LOOP_WORK_DURATION_HISTOGRAM + .observe(duration_to_sec(self.loop_timer.elapsed()) as f64); self.poll_ctx.raft_metrics.flush(); self.poll_ctx.store_stat.flush(); } @@ -1245,6 +1266,7 @@ where peer_msg_buf: Vec::with_capacity(ctx.cfg.messages_per_tick), previous_metrics: ctx.raft_metrics.clone(), timer: TiInstant::now_coarse(), + loop_timer: TiInstant::now_coarse(), messages_per_tick: ctx.cfg.messages_per_tick, poll_ctx: ctx, cfg_tracker: self.cfg.clone().tracker(tag), diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 2b0ac6901c3..e1ab55cf9e4 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -180,6 +180,43 @@ make_auto_flush_static_metric! { } lazy_static! { + pub static ref ASYNC_WRITER_IO_QUEUE_VEC: IntGaugeVec = + register_int_gauge_vec!( + "tikv_raftstore_async_writer_io_queue_total", + "Current pending + running io tasks.", + &["name"] + ).unwrap(); + pub static ref STORE_LOOP_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_loop_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_LOOP_WORK_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_loop_work_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_KVDB_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_kvdb_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_raftdb_tick_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref STORE_WRITE_RAFTDB_DURATION_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_store_write_raftdb_duration_seconds", + "TODO", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( "tikv_raftstore_proposal_total", From 8571bff48bddb21a45401fab5df3cd5a2a7fe636 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Tue, 29 Dec 2020 11:52:38 +0800 Subject: [PATCH 07/15] Add io config entries Signed-off-by: Liu Cong --- components/raftstore/src/store/config.rs | 12 ++++++++++++ components/raftstore/src/store/fsm/apply.rs | 5 ++++- components/raftstore/src/store/fsm/store.rs | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index c7a19e5e652..a8e1c200899 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -32,6 +32,10 @@ pub struct Config { pub delay_sync_us: u64, #[config(skip)] pub store_io_pool_size: u64, + #[config(skip)] + pub store_io_queue: u64, + #[config(skip)] + pub apply_io_size: u64, // minimizes disruption when a partitioned node rejoins the cluster by using a two phase election. #[config(skip)] pub prevote: bool, @@ -199,6 +203,8 @@ impl Default for Config { Config { delay_sync_us: 0, store_io_pool_size: 2, + store_io_queue: 1, + apply_io_size: 1024 * 32, prevote: true, raftdb_path: String::new(), capacity: ReadableSize(0), @@ -427,6 +433,12 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_pool_size"]) .set((self.store_io_pool_size as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue"]) + .set((self.store_io_queue as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["apply_io_size"]) + .set((self.apply_io_size as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["prevote"]) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 644ad98b0a9..e9d926a7630 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -353,6 +353,8 @@ where yield_duration: Duration, + apply_io_size: usize, + store_id: u64, /// region_id -> (peer_id, is_splitting) /// Used for handling race between splitting and creating new peer. @@ -402,6 +404,7 @@ where use_delete_range: cfg.use_delete_range, perf_context_statistics: PerfContextStatistics::new(cfg.perf_level), yield_duration: cfg.apply_yield_duration.0, + apply_io_size: cfg.apply_io_size as usize, store_id, pending_create_peers, } @@ -922,7 +925,7 @@ where if !data.is_empty() { let cmd = util::parse_data_at(data, index, &self.tag); - if should_write_to_engine(&cmd) || apply_ctx.kv_wb().should_write_to_engine() { + if should_write_to_engine(&cmd) || apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size { apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.elapsed() >= apply_ctx.yield_duration { diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 54d0a5d5050..41feef40795 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1379,7 +1379,7 @@ impl RaftBatchSystem { ); let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), self.router.clone(), "raftstore-async-writer".to_string(), - cfg.value().store_io_pool_size as usize, 16))); + cfg.value().store_io_pool_size as usize, cfg.value().store_io_queue as usize))); let mut builder = RaftPollerBuilder { cfg, store: meta, From 1e3bc7065bb87b549b18071e9a9d3f8c80d0c4f4 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Mon, 4 Jan 2021 17:37:15 +0800 Subject: [PATCH 08/15] Merge region IO Signed-off-by: Liu Cong --- components/raftstore/src/store/config.rs | 2 +- components/raftstore/src/store/fsm/store.rs | 190 ++++++++++-------- components/raftstore/src/store/metrics.rs | 6 + components/raftstore/src/store/peer.rs | 3 +- .../raftstore/src/store/peer_storage.rs | 71 +++++-- 5 files changed, 170 insertions(+), 102 deletions(-) diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index a8e1c200899..ed91d223406 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -204,7 +204,7 @@ impl Default for Config { delay_sync_us: 0, store_io_pool_size: 2, store_io_queue: 1, - apply_io_size: 1024 * 32, + apply_io_size: 0, prevote: true, raftdb_path: String::new(), capacity: ReadableSize(0), diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 41feef40795..bf0c0d1af7e 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -82,12 +82,11 @@ const RAFT_WB_SHRINK_SIZE: usize = 1024 * 1024; pub const PENDING_MSG_CAP: usize = 100; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); +use crate::store::peer_storage::AsyncWriterTask; use std::collections::VecDeque; use std::thread::JoinHandle; -use prometheus::IntGauge; -use chan; -pub struct AsyncDBWriter +pub struct AsyncWriter where EK: KvEngine, ER: RaftEngine, @@ -95,49 +94,50 @@ where engine: ER, router: RaftRouter, tag: String, - wbs: Arc>>, - pub tx: chan::Sender<(ER::LogBatch, VecDeque)>, - rx: Arc)>>, + buff_size: usize, + tasks: Arc>>>, workers: Arc>>>, - metrics_queue_size: IntGauge, } -impl Clone for AsyncDBWriter +impl Clone for AsyncWriter where EK: KvEngine, ER: RaftEngine, { fn clone(&self) -> Self { - AsyncDBWriter{ + AsyncWriter{ engine: self.engine.clone(), router: self.router.clone(), tag: self.tag.clone(), - wbs: self.wbs.clone(), - tx: self.tx.clone(), - rx: self.rx.clone(), + buff_size: self.buff_size, + tasks: self.tasks.clone(), workers: self.workers.clone(), - metrics_queue_size: self.metrics_queue_size.clone(), } } } -impl AsyncDBWriter +impl AsyncWriter where EK: KvEngine, ER: RaftEngine, { - pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize, buff_size: usize) -> AsyncDBWriter { - let (tx, rx) = chan::sync(buff_size); - let mut async_writer = AsyncDBWriter{ + pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize, buff_size: usize) -> AsyncWriter { + let mut tasks = VecDeque::default(); + for _ in 0..(pool_size + 1) { + tasks.push_back( + AsyncWriterTask { + wb: engine.log_batch(4 * 1024), + unsynced_readies: HashMap::default(), + } + ); + } + let mut async_writer = AsyncWriter{ engine, router, tag, - wbs: Arc::new(Mutex::new(VecDeque::default())), - tx, - rx: Arc::new(rx), + buff_size, + tasks: Arc::new(Mutex::new(tasks)), workers: Arc::new(Mutex::new(vec![])), - metrics_queue_size: ASYNC_WRITER_IO_QUEUE_VEC - .with_label_values(&["raft-log"]), }; async_writer.spawn(pool_size); async_writer @@ -149,10 +149,32 @@ where let t = thread::Builder::new() .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { + let mut sleep = false; loop { + // TODO: block if no data in current raft_wb + if sleep { + let d = Duration::from_millis(1); + thread::sleep(d); + } + sleep = false; let now = TiInstant::now_coarse(); - let (wb, unsynced_readies) = x.rx.recv().unwrap(); - x.sync_write(wb, unsynced_readies); + // TODO: block if too many data in current raft_wb + let task = { + let mut tasks = x.tasks.lock().unwrap(); + if tasks.front().unwrap().unsynced_readies.is_empty() { + sleep = true; + continue; + } + tasks.pop_front().unwrap() + }; + + let wb = x.sync_write(task.wb, task.unsynced_readies); + + { + let mut tasks = x.tasks.lock().unwrap(); + tasks.push_back(AsyncWriterTask{wb, unsynced_readies: HashMap::default()}); + } + STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM .observe(duration_to_sec(now.elapsed()) as f64); } @@ -163,21 +185,12 @@ where } } - pub fn new_wb(&mut self) -> ER::LogBatch { - let mut wbs = self.wbs.lock().unwrap(); - if wbs.is_empty() { - self.engine.log_batch(4 * 1024) - } else { - wbs.pop_front().unwrap() - } - } - - pub fn async_write(&mut self, wb: ER::LogBatch, unsynced_readies: VecDeque) { - self.metrics_queue_size.inc(); - self.tx.send((wb, unsynced_readies)); + pub fn raft_wb_pool(&mut self) -> Arc>>> { + self.tasks.clone() } - pub fn sync_write(&mut self, mut wb: ER::LogBatch, unsynced_readies: VecDeque) { + // TODO: this func is assumed in tasks.locked status + pub fn sync_write(&mut self, mut wb: ER::LogBatch, mut unsynced_readies: HashMap) -> ER::LogBatch { let now = TiInstant::now_coarse(); self.engine .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) @@ -185,31 +198,41 @@ where panic!("{} failed to save raft append result: {:?}", self.tag, e); }); STORE_WRITE_RAFTDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64); - self.metrics_queue_size.dec(); - self.flush_unsynced_readies(unsynced_readies); - let mut wbs = self.wbs.lock().unwrap(); - wbs.push_back(wb); + self.flush_unsynced_readies(&unsynced_readies); + unsynced_readies.clear(); + wb } - fn flush_unsynced_readies(&mut self, mut unsynced_readies: VecDeque) { + fn flush_unsynced_readies(&mut self, unsynced_readies: &HashMap) { + for (_, r) in unsynced_readies { + self.flush_unsynced_ready(r); + } + } + + fn drain_flush_unsynced_readies(&mut self, mut unsynced_readies: VecDeque) { for r in unsynced_readies.drain(..) { - loop { - let pre_number = r.notifier.load(Ordering::Acquire); - assert_ne!(pre_number, r.number); - if pre_number > r.number { - break; - } - if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) { - if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { - error!( - "failed to send noop to trigger persisted ready"; - "region_id" => r.region_id, - "ready_number" => r.number, - "error" => ?e, - ); - } - break; + self.flush_unsynced_ready(&r); + } + } + + fn flush_unsynced_ready(&mut self, r: &UnsyncedReady) { + loop { + let pre_number = r.notifier.load(Ordering::Acquire); + // TODO: reduce duplicated messages + //assert_ne!(pre_number, r.number); + if pre_number >= r.number { + break; + } + if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) { + if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { + error!( + "failed to send noop to trigger persisted ready"; + "region_id" => r.region_id, + "ready_number" => r.number, + "error" => ?e, + ); } + break; } } } @@ -462,7 +485,8 @@ where pub store_stat: LocalStoreStat, pub engines: Engines, pub kv_wb: EK::WriteBatch, - pub raft_wb: ER::LogBatch, + //pub raft_wb: ER::LogBatch, + pub raft_wb_is_empty: bool, pub pending_count: usize, pub sync_log: bool, pub has_ready: bool, @@ -472,7 +496,7 @@ where pub tick_batch: Vec, pub node_start_time: Option, pub sync_policy: SyncPolicy>, - pub async_writer: Arc>>, + pub async_writer: Arc>>, } impl HandleRaftReadyContext for PollContext @@ -480,8 +504,10 @@ where EK: KvEngine, ER: RaftEngine, { - fn wb_mut(&mut self) -> (&mut EK::WriteBatch, &mut ER::LogBatch) { - (&mut self.kv_wb, &mut self.raft_wb) + fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc::>>>) { + self.raft_wb_is_empty = false; + let mut async_writer = self.async_writer.lock().unwrap(); + (&mut self.kv_wb, async_writer.raft_wb_pool()) } #[inline] @@ -490,8 +516,10 @@ where } #[inline] - fn raft_wb_mut(&mut self) -> &mut ER::LogBatch { - &mut self.raft_wb + fn raft_wb_pool(&mut self) -> Arc>>> { + self.raft_wb_is_empty = false; + let mut async_writer = self.async_writer.lock().unwrap(); + async_writer.raft_wb_pool() } #[inline] @@ -510,14 +538,6 @@ where EK: KvEngine, ER: RaftEngine, { - #[inline] - pub fn detach_raft_wb(&mut self) -> ER::LogBatch { - let mut async_writer = self.async_writer.lock().unwrap(); - let mut raft_wb = async_writer.new_wb(); - mem::swap(&mut self.raft_wb, &mut raft_wb); - raft_wb - } - #[inline] pub fn store_id(&self) -> u64 { self.store.get_id() @@ -774,7 +794,7 @@ impl RaftPoller { // the id of slow store in tests. fail_point!("on_raft_ready", self.poll_ctx.store_id() == 3, |_| {}); if self.poll_ctx.trans.need_flush() - && (!self.poll_ctx.kv_wb.is_empty() || !self.poll_ctx.raft_wb.is_empty()) + && (!self.poll_ctx.kv_wb.is_empty() || !self.poll_ctx.raft_wb_is_empty) { self.poll_ctx.trans.flush(); } @@ -803,7 +823,7 @@ impl RaftPoller { STORE_WRITE_KVDB_DURATION_HISTOGRAM .observe(duration_to_sec(now.elapsed()) as f64); - let raft_wb_is_empty = self.poll_ctx.raft_wb.is_empty(); + let raft_wb_is_empty = self.poll_ctx.raft_wb_is_empty; if !raft_wb_is_empty { fail_point!( "raft_before_save_on_store_1", @@ -835,15 +855,22 @@ impl RaftPoller { } } - let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); if !raft_wb_is_empty { - let raft_wb = self.poll_ctx.detach_raft_wb(); + // Do nothing + /* Sync write testing let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); - async_writer.async_write(raft_wb, unsynced_readies); + let raft_wb_pool = async_writer.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let task = raft_wbs.pop_front().unwrap(); + let wb = async_writer.sync_write(task.wb, task.unsynced_readies); + raft_wbs.push_back(AsyncWriterTask{wb, unsynced_readies: HashMap::default()}); + */ } else { + let unsynced_readies = self.poll_ctx.sync_policy.detach_unsynced_readies(); let mut async_writer = self.poll_ctx.async_writer.lock().unwrap(); - async_writer.flush_unsynced_readies(unsynced_readies); + async_writer.drain_flush_unsynced_readies(unsynced_readies); } + self.poll_ctx.raft_wb_is_empty = true; let dur = self.timer.elapsed(); if !self.poll_ctx.store_stat.is_busy { @@ -1045,7 +1072,7 @@ pub struct RaftPollerBuilder { applying_snap_count: Arc, global_replication_state: Arc>, pub sync_policy: SyncPolicy>, - pub async_writer: Arc>>, + pub async_writer: Arc>>, } impl RaftPollerBuilder { @@ -1246,7 +1273,8 @@ where store_stat: self.global_stat.local(), engines: self.engines.clone(), kv_wb: self.engines.kv.write_batch(), - raft_wb: self.engines.raft.log_batch(4 * 1024), + //raft_wb: self.engines.raft.log_batch(4 * 1024), + raft_wb_is_empty: true, pending_count: 0, sync_log: false, has_ready: false, @@ -1377,7 +1405,7 @@ impl RaftBatchSystem { cfg.value().delay_sync_enabled(), cfg.value().delay_sync_us as i64, ); - let async_writer = Arc::new(Mutex::new(AsyncDBWriter::new(engines.raft.clone(), + let async_writer = Arc::new(Mutex::new(AsyncWriter::new(engines.raft.clone(), self.router.clone(), "raftstore-async-writer".to_string(), cfg.value().store_io_pool_size as usize, cfg.value().store_io_queue as usize))); let mut builder = RaftPollerBuilder { diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index e1ab55cf9e4..5f4db195caf 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -216,6 +216,12 @@ lazy_static! { "TODO", exponential_buckets(0.0005, 2.0, 20).unwrap() ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_QUEUE_SIZE: Histogram = + register_histogram!( + "tikv_raftstore_store_write_queue_size", + "TODO", + exponential_buckets(1.0, 2.0, 20).unwrap() + ).unwrap(); pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 0a770b0a95f..cb495e8c76c 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1685,9 +1685,10 @@ where self.handle_raft_committed_entries(ctx, ready.take_committed_entries()); } + let notifier = self.persisted_notifier.clone(); let invoke_ctx = match self .mut_store() - .handle_raft_ready(ctx, &mut ready, destroy_regions) + .handle_raft_ready(ctx, &mut ready, destroy_regions, notifier) { Ok(r) => r, Err(e) => { diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 56986b52244..2a02fcc17b8 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -26,11 +26,16 @@ use crate::{Error, Result}; use engine_traits::{RaftEngine, RaftLogBatch}; use into_other::into_other; use tikv_util::worker::Scheduler; +use std::sync::Mutex; use super::metrics::*; use super::worker::RegionTask; use super::{SnapEntry, SnapKey, SnapManager, SnapshotStatistics}; +use tikv_util::collections::HashMap; +use crate::store::fsm::sync_policy::UnsyncedReady; +use std::sync::atomic::AtomicU64; + // When we create a region peer, we should initialize its log term/index > 0, // so that we can force the follower peer to sync the snapshot first. pub const RAFT_INIT_LOG_TERM: u64 = 5; @@ -304,15 +309,24 @@ impl Drop for EntryCache { } } +pub struct AsyncWriterTask +where + WR: RaftLogBatch, +{ + pub wb: WR, + pub unsynced_readies: HashMap, +} + pub trait HandleRaftReadyContext where WK: Mutable, WR: RaftLogBatch, { /// Returns the mutable references of WriteBatch for both KvDB and RaftDB in one interface. - fn wb_mut(&mut self) -> (&mut WK, &mut WR); + fn wb_mut(&mut self) -> (&mut WK, Arc>>>); fn kv_wb_mut(&mut self) -> &mut WK; - fn raft_wb_mut(&mut self) -> &mut WR; + //fn raft_wb_mut(&mut self) -> &mut WR; + fn raft_wb_pool(&mut self) -> Arc>>>; fn sync_log(&self) -> bool; fn set_sync_log(&mut self, sync: bool); } @@ -1016,6 +1030,8 @@ where invoke_ctx: &mut InvokeContext, entries: Vec, ready_ctx: &mut H, + ready_number: u64, + region_notifier: Arc, ) -> Result { let region_id = self.get_region_id(); debug!( @@ -1043,13 +1059,17 @@ where cache.append(&self.tag, &entries); } - ready_ctx.raft_wb_mut().append(region_id, entries)?; - - // Delete any previously appended log entries which never committed. - // TODO: Wrap it as an engine::Error. - ready_ctx - .raft_wb_mut() - .cut_logs(region_id, last_index + 1, prev_last_index); + { + let raft_wb_pool = ready_ctx.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = raft_wbs.front_mut().unwrap(); + current.wb.append(region_id, entries)?; + current.unsynced_readies.insert(region_id, + UnsyncedReady{number: ready_number, region_id, notifier: region_notifier, version: 0}); + // Delete any previously appended log entries which never committed. + // TODO: Wrap it as an engine::Error. + current.wb.cut_logs(region_id, last_index + 1, prev_last_index); + } invoke_ctx.raft_state.set_last_index(last_index); invoke_ctx.last_term = last_term; @@ -1365,14 +1385,20 @@ where ready_ctx: &mut H, ready: &mut Ready, destroy_regions: Vec, + region_notifier: Arc, ) -> Result { + let region_id = self.get_region_id(); let mut ctx = InvokeContext::new(self); let snapshot_index = if ready.snapshot().is_empty() { 0 } else { fail_point!("raft_before_apply_snap"); - let (kv_wb, raft_wb) = ready_ctx.wb_mut(); - self.apply_snapshot(&mut ctx, ready.snapshot(), kv_wb, raft_wb, &destroy_regions)?; + let (kv_wb, raft_wb_pool) = ready_ctx.wb_mut(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = raft_wbs.front_mut().unwrap(); + self.apply_snapshot(&mut ctx, ready.snapshot(), kv_wb, &mut current.wb, &destroy_regions)?; + current.unsynced_readies.insert(region_id, + UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); fail_point!("raft_after_apply_snap"); ctx.destroyed_regions = destroy_regions; @@ -1381,7 +1407,7 @@ where }; if !ready.entries().is_empty() { - self.append(&mut ctx, ready.take_entries(), ready_ctx)?; + self.append(&mut ctx, ready.take_entries(), ready_ctx, ready.number(), region_notifier.clone())?; } // Last index is 0 means the peer is created from raft message @@ -1394,7 +1420,12 @@ where // Save raft state if it has changed or there is a snapshot. if ctx.raft_state != self.raft_state || snapshot_index > 0 { - ctx.save_raft_state_to(ready_ctx.raft_wb_mut())?; + { + let raft_wb_pool = ready_ctx.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = raft_wbs.front_mut().unwrap(); + ctx.save_raft_state_to(&mut current.wb)?; + } if snapshot_index > 0 { // in case of restart happen when we just write region state to Applying, // but not write raft_local_state to raft rocksdb in time. @@ -1642,7 +1673,7 @@ pub fn write_peer_state( kv_wb.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), ®ion_state)?; Ok(()) } - +/* #[cfg(test)] mod tests { use crate::coprocessor::CoprocessorHost; @@ -1705,14 +1736,14 @@ mod tests { } impl HandleRaftReadyContext for ReadyContext { - fn wb_mut(&mut self) -> (&mut KvTestWriteBatch, &mut RaftTestWriteBatch) { - (&mut self.kv_wb, &mut self.raft_wb) - } + //fn wb_mut(&mut self) -> (&mut KvTestWriteBatch, &mut RaftTestWriteBatch) { + // (&mut self.kv_wb, &mut self.raft_wb) + //} fn kv_wb_mut(&mut self) -> &mut KvTestWriteBatch { &mut self.kv_wb } - fn raft_wb_mut(&mut self) -> &mut RaftTestWriteBatch { - &mut self.raft_wb + fn raft_wb_pool(&mut self) -> &mut RaftTestWriteBatch { + &mut self.raft_wbs } fn sync_log(&self) -> bool { self.sync_log @@ -2621,3 +2652,5 @@ mod tests { assert!(build_storage().is_err()); } } + +*/ From 2526754ef505440ae7750385b19ea41c01fae501 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Mon, 4 Jan 2021 17:46:09 +0800 Subject: [PATCH 09/15] Support both style of apply IO size Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/apply.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index e9d926a7630..ee8f89ef14a 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -925,7 +925,9 @@ where if !data.is_empty() { let cmd = util::parse_data_at(data, index, &self.tag); - if should_write_to_engine(&cmd) || apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size { + if should_write_to_engine(&cmd) || + (apply_ctx.apply_io_size != 0 && (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size)) || + (apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine()) { apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.elapsed() >= apply_ctx.yield_duration { From 9d037753036c55ad5d9f5865f308927340271d5d Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Mon, 4 Jan 2021 23:33:33 +0800 Subject: [PATCH 10/15] Add store_io_min_interval_us Signed-off-by: Liu Cong --- components/raftstore/src/lib.rs | 2 ++ components/raftstore/src/store/config.rs | 6 +++++ components/raftstore/src/store/fsm/store.rs | 29 +++++++++++---------- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/components/raftstore/src/lib.rs b/components/raftstore/src/lib.rs index e74bae5fe7b..846262bfd2d 100644 --- a/components/raftstore/src/lib.rs +++ b/components/raftstore/src/lib.rs @@ -6,6 +6,8 @@ #![feature(div_duration)] #![feature(min_specialization)] #![feature(box_patterns)] +#![feature(duration_saturating_ops)] +#![feature(duration_zero)] #[macro_use] extern crate bitflags; diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index ed91d223406..b27ec2941c4 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -31,6 +31,8 @@ pub struct Config { #[config(skip)] pub delay_sync_us: u64, #[config(skip)] + pub store_io_min_interval_us: u64, + #[config(skip)] pub store_io_pool_size: u64, #[config(skip)] pub store_io_queue: u64, @@ -202,6 +204,7 @@ impl Default for Config { let split_size = ReadableSize::mb(coprocessor::config::SPLIT_SIZE_MB); Config { delay_sync_us: 0, + store_io_min_interval_us: 500, store_io_pool_size: 2, store_io_queue: 1, apply_io_size: 0, @@ -430,6 +433,9 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["delay_sync_us"]) .set((self.delay_sync_us as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_min_interval_us"]) + .set((self.store_io_min_interval_us as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_pool_size"]) .set((self.store_io_pool_size as i32).into()); diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index bf0c0d1af7e..409b8dcf6d0 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -94,7 +94,7 @@ where engine: ER, router: RaftRouter, tag: String, - buff_size: usize, + io_min_interval: Duration, tasks: Arc>>>, workers: Arc>>>, } @@ -109,7 +109,7 @@ where engine: self.engine.clone(), router: self.router.clone(), tag: self.tag.clone(), - buff_size: self.buff_size, + io_min_interval: self.io_min_interval.clone(), tasks: self.tasks.clone(), workers: self.workers.clone(), } @@ -121,7 +121,8 @@ where EK: KvEngine, ER: RaftEngine, { - pub fn new(engine: ER, router: RaftRouter, tag: String, pool_size: usize, buff_size: usize) -> AsyncWriter { + pub fn new(engine: ER, router: RaftRouter, tag: String, + pool_size: usize, io_min_interval_us: u64) -> AsyncWriter { let mut tasks = VecDeque::default(); for _ in 0..(pool_size + 1) { tasks.push_back( @@ -135,7 +136,7 @@ where engine, router, tag, - buff_size, + io_min_interval: Duration::from_micros(io_min_interval_us), tasks: Arc::new(Mutex::new(tasks)), workers: Arc::new(Mutex::new(vec![])), }; @@ -149,20 +150,20 @@ where let t = thread::Builder::new() .name(thd_name!(format!("raftdb-async-writer-{}", i))) .spawn(move || { - let mut sleep = false; + let mut last_ts = TiInstant::now_coarse(); loop { - // TODO: block if no data in current raft_wb - if sleep { - let d = Duration::from_millis(1); - thread::sleep(d); + let mut now_ts = TiInstant::now_coarse(); + let delta = (now_ts - last_ts).saturating_sub(x.io_min_interval); + if !delta.is_zero() { + thread::sleep(delta); + now_ts = TiInstant::now_coarse(); } - sleep = false; - let now = TiInstant::now_coarse(); + last_ts = now_ts; + // TODO: block if too many data in current raft_wb let task = { let mut tasks = x.tasks.lock().unwrap(); if tasks.front().unwrap().unsynced_readies.is_empty() { - sleep = true; continue; } tasks.pop_front().unwrap() @@ -176,7 +177,7 @@ where } STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM - .observe(duration_to_sec(now.elapsed()) as f64); + .observe(duration_to_sec(now_ts.elapsed()) as f64); } }) .unwrap(); @@ -1407,7 +1408,7 @@ impl RaftBatchSystem { ); let async_writer = Arc::new(Mutex::new(AsyncWriter::new(engines.raft.clone(), self.router.clone(), "raftstore-async-writer".to_string(), - cfg.value().store_io_pool_size as usize, cfg.value().store_io_queue as usize))); + cfg.value().store_io_pool_size as usize, cfg.value().store_io_min_interval_us))); let mut builder = RaftPollerBuilder { cfg, store: meta, From 115cb2b4024cfba1f8ae5bb00f9fa2e6600413e7 Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Tue, 5 Jan 2021 23:59:13 +0800 Subject: [PATCH 11/15] Fixed corrution Signed-off-by: Liu Cong --- components/engine_panic/src/raft_engine.rs | 4 + components/engine_rocks/src/raft_engine.rs | 4 + components/engine_traits/src/raft_engine.rs | 2 + components/raft_log_engine/src/engine.rs | 4 + components/raftstore/src/store/config.rs | 6 + components/raftstore/src/store/fsm/apply.rs | 8 +- components/raftstore/src/store/fsm/store.rs | 75 +++--- .../raftstore/src/store/fsm/sync_policy.rs | 2 +- components/raftstore/src/store/metrics.rs | 13 +- components/raftstore/src/store/peer.rs | 23 +- .../raftstore/src/store/peer_storage.rs | 236 +++++++++++++----- 11 files changed, 267 insertions(+), 110 deletions(-) diff --git a/components/engine_panic/src/raft_engine.rs b/components/engine_panic/src/raft_engine.rs index ff9f3d43338..4cc1822303d 100644 --- a/components/engine_panic/src/raft_engine.rs +++ b/components/engine_panic/src/raft_engine.rs @@ -105,6 +105,10 @@ impl RaftLogBatch for PanicWriteBatch { panic!() } + fn persist_size(&self) -> usize { + panic!() + } + fn is_empty(&self) -> bool { panic!() } diff --git a/components/engine_rocks/src/raft_engine.rs b/components/engine_rocks/src/raft_engine.rs index f443e01709a..c5ad0cda8cc 100644 --- a/components/engine_rocks/src/raft_engine.rs +++ b/components/engine_rocks/src/raft_engine.rs @@ -239,6 +239,10 @@ impl RaftLogBatch for RocksWriteBatch { self.put_msg(&keys::raft_state_key(raft_group_id), state) } + fn persist_size(&self) -> usize { + self.data_size() + } + fn is_empty(&self) -> bool { Mutable::is_empty(self) } diff --git a/components/engine_traits/src/raft_engine.rs b/components/engine_traits/src/raft_engine.rs index 68dc66285d1..3b9dac1bd77 100644 --- a/components/engine_traits/src/raft_engine.rs +++ b/components/engine_traits/src/raft_engine.rs @@ -87,6 +87,8 @@ pub trait RaftLogBatch: Send { fn put_raft_state(&mut self, raft_group_id: u64, state: &RaftLocalState) -> Result<()>; + fn persist_size(&self) -> usize; + fn is_empty(&self) -> bool; } diff --git a/components/raft_log_engine/src/engine.rs b/components/raft_log_engine/src/engine.rs index a77249d643a..e6cb5bcc3be 100644 --- a/components/raft_log_engine/src/engine.rs +++ b/components/raft_log_engine/src/engine.rs @@ -59,6 +59,10 @@ impl RaftLogBatchTrait for RaftLogBatch { Ok(()) } + fn persist_size(&self) -> usize { + panic!("not impl!") + } + fn is_empty(&self) -> bool { self.0.items.is_empty() } diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index b27ec2941c4..be213c1e11e 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -33,6 +33,8 @@ pub struct Config { #[config(skip)] pub store_io_min_interval_us: u64, #[config(skip)] + pub store_io_max_bytes: u64, + #[config(skip)] pub store_io_pool_size: u64, #[config(skip)] pub store_io_queue: u64, @@ -205,6 +207,7 @@ impl Default for Config { Config { delay_sync_us: 0, store_io_min_interval_us: 500, + store_io_max_bytes: 128 * 1024, store_io_pool_size: 2, store_io_queue: 1, apply_io_size: 0, @@ -436,6 +439,9 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_min_interval_us"]) .set((self.store_io_min_interval_us as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_max_bytes"]) + .set((self.store_io_max_bytes as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_pool_size"]) .set((self.store_io_pool_size as i32).into()); diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index ee8f89ef14a..e09fe3502e2 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -925,9 +925,11 @@ where if !data.is_empty() { let cmd = util::parse_data_at(data, index, &self.tag); - if should_write_to_engine(&cmd) || - (apply_ctx.apply_io_size != 0 && (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size)) || - (apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine()) { + if should_write_to_engine(&cmd) + || (apply_ctx.apply_io_size != 0 + && (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size)) + || (apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine()) + { apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.elapsed() >= apply_ctx.yield_duration { diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 409b8dcf6d0..8367c03f3c1 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -83,6 +83,7 @@ pub const PENDING_MSG_CAP: usize = 100; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); use crate::store::peer_storage::AsyncWriterTask; +use crate::store::peer_storage::AsyncWriterTasks; use std::collections::VecDeque; use std::thread::JoinHandle; @@ -95,7 +96,7 @@ where router: RaftRouter, tag: String, io_min_interval: Duration, - tasks: Arc>>>, + tasks: Arc>>, workers: Arc>>>, } @@ -105,7 +106,7 @@ where ER: RaftEngine, { fn clone(&self) -> Self { - AsyncWriter{ + AsyncWriter { engine: self.engine.clone(), router: self.router.clone(), tag: self.tag.clone(), @@ -121,18 +122,17 @@ where EK: KvEngine, ER: RaftEngine, { - pub fn new(engine: ER, router: RaftRouter, tag: String, - pool_size: usize, io_min_interval_us: u64) -> AsyncWriter { - let mut tasks = VecDeque::default(); - for _ in 0..(pool_size + 1) { - tasks.push_back( - AsyncWriterTask { - wb: engine.log_batch(4 * 1024), - unsynced_readies: HashMap::default(), - } - ); - } - let mut async_writer = AsyncWriter{ + pub fn new( + engine: ER, + router: RaftRouter, + tag: String, + pool_size: usize, + io_min_interval_us: u64, + io_max_bytes: u64, + ) -> AsyncWriter { + //let tasks = AsyncWriterTasks::new(engine.clone(), pool_size * 3, io_max_bytes as usize); + let tasks = AsyncWriterTasks::new(engine.clone(), pool_size + 1, io_max_bytes as usize); + let mut async_writer = AsyncWriter { engine, router, tag, @@ -160,20 +160,23 @@ where } last_ts = now_ts; - // TODO: block if too many data in current raft_wb let task = { let mut tasks = x.tasks.lock().unwrap(); - if tasks.front().unwrap().unsynced_readies.is_empty() { + if tasks.no_task() { continue; } - tasks.pop_front().unwrap() + tasks.detach_task() }; let wb = x.sync_write(task.wb, task.unsynced_readies); + // TODO: block if too many tasks { let mut tasks = x.tasks.lock().unwrap(); - tasks.push_back(AsyncWriterTask{wb, unsynced_readies: HashMap::default()}); + tasks.add(AsyncWriterTask { + wb, + unsynced_readies: HashMap::default(), + }); } STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM @@ -186,12 +189,16 @@ where } } - pub fn raft_wb_pool(&mut self) -> Arc>>> { + pub fn raft_wb_pool(&mut self) -> Arc>> { self.tasks.clone() } // TODO: this func is assumed in tasks.locked status - pub fn sync_write(&mut self, mut wb: ER::LogBatch, mut unsynced_readies: HashMap) -> ER::LogBatch { + pub fn sync_write( + &mut self, + mut wb: ER::LogBatch, + mut unsynced_readies: HashMap, + ) -> ER::LogBatch { let now = TiInstant::now_coarse(); self.engine .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) @@ -224,7 +231,10 @@ where if pre_number >= r.number { break; } - if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) { + if pre_number + == r.notifier + .compare_and_swap(pre_number, r.number, Ordering::AcqRel) + { if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { error!( "failed to send noop to trigger persisted ready"; @@ -500,12 +510,12 @@ where pub async_writer: Arc>>, } -impl HandleRaftReadyContext for PollContext +impl HandleRaftReadyContext for PollContext where EK: KvEngine, ER: RaftEngine, { - fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc::>>>) { + fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc>>) { self.raft_wb_is_empty = false; let mut async_writer = self.async_writer.lock().unwrap(); (&mut self.kv_wb, async_writer.raft_wb_pool()) @@ -517,7 +527,7 @@ where } #[inline] - fn raft_wb_pool(&mut self) -> Arc>>> { + fn raft_wb_pool(&mut self) -> Arc>> { self.raft_wb_is_empty = false; let mut async_writer = self.async_writer.lock().unwrap(); async_writer.raft_wb_pool() @@ -821,8 +831,7 @@ impl RaftPoller { } } fail_point!("raft_between_save"); - STORE_WRITE_KVDB_DURATION_HISTOGRAM - .observe(duration_to_sec(now.elapsed()) as f64); + STORE_WRITE_KVDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64); let raft_wb_is_empty = self.poll_ctx.raft_wb_is_empty; if !raft_wb_is_empty { @@ -933,8 +942,7 @@ impl PollHandler, St self.poll_ctx.sync_log = false; self.poll_ctx.has_ready = false; self.timer = TiInstant::now_coarse(); - STORE_LOOP_DURATION_HISTOGRAM - .observe(duration_to_sec(self.loop_timer.elapsed()) as f64); + STORE_LOOP_DURATION_HISTOGRAM.observe(duration_to_sec(self.loop_timer.elapsed()) as f64); self.loop_timer = TiInstant::now_coarse(); // update config self.poll_ctx.perf_context_statistics.start(); @@ -1406,9 +1414,14 @@ impl RaftBatchSystem { cfg.value().delay_sync_enabled(), cfg.value().delay_sync_us as i64, ); - let async_writer = Arc::new(Mutex::new(AsyncWriter::new(engines.raft.clone(), - self.router.clone(), "raftstore-async-writer".to_string(), - cfg.value().store_io_pool_size as usize, cfg.value().store_io_min_interval_us))); + let async_writer = Arc::new(Mutex::new(AsyncWriter::new( + engines.raft.clone(), + self.router.clone(), + "raftstore-async-writer".to_string(), + cfg.value().store_io_pool_size as usize, + cfg.value().store_io_min_interval_us, + cfg.value().store_io_max_bytes, + ))); let mut builder = RaftPollerBuilder { cfg, store: meta, diff --git a/components/raftstore/src/store/fsm/sync_policy.rs b/components/raftstore/src/store/fsm/sync_policy.rs index 40154df1fea..e0e997fd766 100644 --- a/components/raftstore/src/store/fsm/sync_policy.rs +++ b/components/raftstore/src/store/fsm/sync_policy.rs @@ -1,10 +1,10 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. use std::collections::VecDeque; +use std::mem; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::Arc; -use std::mem; use crossbeam::utils::CachePadded; use engine_traits::KvEngine; diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 5f4db195caf..805c38869bd 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -180,12 +180,6 @@ make_auto_flush_static_metric! { } lazy_static! { - pub static ref ASYNC_WRITER_IO_QUEUE_VEC: IntGaugeVec = - register_int_gauge_vec!( - "tikv_raftstore_async_writer_io_queue_total", - "Current pending + running io tasks.", - &["name"] - ).unwrap(); pub static ref STORE_LOOP_DURATION_HISTOGRAM: Histogram = register_histogram!( "tikv_raftstore_store_loop_duration_seconds", @@ -222,6 +216,13 @@ lazy_static! { "TODO", exponential_buckets(1.0, 2.0, 20).unwrap() ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_ADAPTIVE_IDX: Histogram = + register_histogram!( + "tikv_raftstore_store_adaptive_idx", + "TODO", + exponential_buckets(1.0, 2.0, 20).unwrap() + ).unwrap(); + pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index cb495e8c76c..178bc8d98a2 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1686,17 +1686,18 @@ where } let notifier = self.persisted_notifier.clone(); - let invoke_ctx = match self - .mut_store() - .handle_raft_ready(ctx, &mut ready, destroy_regions, notifier) - { - Ok(r) => r, - Err(e) => { - // We may have written something to writebatch and it can't be reverted, so has - // to panic here. - panic!("{} failed to handle raft ready: {:?}", self.tag, e) - } - }; + let invoke_ctx = + match self + .mut_store() + .handle_raft_ready(ctx, &mut ready, destroy_regions, notifier) + { + Ok(r) => r, + Err(e) => { + // We may have written something to writebatch and it can't be reverted, so has + // to panic here. + panic!("{} failed to handle raft ready: {:?}", self.tag, e) + } + }; Some((ready, invoke_ctx)) } diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 2a02fcc17b8..c3b1546606f 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -25,16 +25,16 @@ use crate::store::ProposalContext; use crate::{Error, Result}; use engine_traits::{RaftEngine, RaftLogBatch}; use into_other::into_other; -use tikv_util::worker::Scheduler; use std::sync::Mutex; +use tikv_util::worker::Scheduler; use super::metrics::*; use super::worker::RegionTask; use super::{SnapEntry, SnapKey, SnapManager, SnapshotStatistics}; -use tikv_util::collections::HashMap; use crate::store::fsm::sync_policy::UnsyncedReady; use std::sync::atomic::AtomicU64; +use tikv_util::collections::HashMap; // When we create a region peer, we should initialize its log term/index > 0, // so that we can force the follower peer to sync the snapshot first. @@ -317,16 +317,141 @@ where pub unsynced_readies: HashMap, } -pub trait HandleRaftReadyContext +pub struct AsyncWriterTasks +where + ER: RaftEngine, +{ + wbs: VecDeque>, + size_limits: Vec, + current_idx: usize, + sample_window: VecDeque, + sample_window_size: usize, + adaptive_idx: usize, +} + +// TODO: make sure the queue size is good when doing pop/front +impl AsyncWriterTasks +where + ER: RaftEngine, +{ + pub fn new(engine: ER, queue_size: usize, task_soft_max_bytes: usize) -> AsyncWriterTasks { + let mut wbs = VecDeque::default(); + for _ in 0..queue_size { + wbs.push_back(AsyncWriterTask { + wb: engine.log_batch(4 * 1024), + unsynced_readies: HashMap::default(), + }); + } + let mut size_limits = vec![]; + let mut size_limit = task_soft_max_bytes; + for _ in 0..(queue_size * 10) { + size_limits.push(size_limit); + size_limit = (size_limit as f64 * 1.2) as usize; + } + AsyncWriterTasks { + wbs, + size_limits, + current_idx: 0, + sample_window: VecDeque::default(), + sample_window_size: 4, + adaptive_idx: 0, + } + } + + pub fn prepare_current_for_write( + &mut self, + region_id: u64, + ready_number: u64, + region_notifier: Arc, + ) -> &mut AsyncWriterTask { + assert!(self.current_idx <= self.wbs.len()); + let current_size = { + self.wbs[self.current_idx].wb.persist_size() + }; + if current_size >= self.size_limits[self.adaptive_idx + self.current_idx] { + if self.current_idx + 1 < self.wbs.len() { + self.current_idx += 1; + assert!(self.wbs[self.current_idx].unsynced_readies.len() == 0); + } else { + // do nothing, adaptive IO size + } + } + let current = &mut self.wbs[self.current_idx]; + + current.unsynced_readies.insert( + region_id, + UnsyncedReady { + number: ready_number, + region_id, + notifier: region_notifier, + version: 0, + }, + ); + current + } + + pub fn current(&self) -> &AsyncWriterTask { + assert!(self.current_idx <= self.wbs.len()); + &self.wbs[self.current_idx] + } + + pub fn current_mut(&mut self) -> &mut AsyncWriterTask { + assert!(self.current_idx <= self.wbs.len()); + &mut self.wbs[self.current_idx] + } + + pub fn no_task(&self) -> bool { + let no_task = self.wbs.front().unwrap().unsynced_readies.is_empty(); + if no_task { + assert!(self.current_idx == 0); + } + no_task + } + + pub fn detach_task(&mut self) -> AsyncWriterTask { + RAFT_ASYNC_WRITER_QUEUE_SIZE.observe(self.current_idx as f64); + RAFT_ASYNC_WRITER_ADAPTIVE_IDX.observe(self.adaptive_idx as f64); + + let task = self.wbs.pop_front().unwrap(); + let task_bytes = task.wb.persist_size(); + + self.sample_window.push_back(task_bytes); + if self.sample_window.len() > self.sample_window_size { + self.sample_window.pop_front(); + } + let target_bytes = self.size_limits[self.adaptive_idx + self.current_idx]; + let task_avg_bytes = (self.sample_window.iter().sum::() as f64 / self.sample_window.len() as f64) as usize; + if task_avg_bytes >= target_bytes { + if self.adaptive_idx + 1 + self.wbs.len() - 1 < self.size_limits.len() { + self.adaptive_idx += 1; + } + } else if (task_avg_bytes as f64) < ((target_bytes as f64) / 1.25) { + if self.adaptive_idx > 0 { + self.adaptive_idx -= 1; + } + } + + if self.current_idx != 0 { + self.current_idx -= 1; + } + task + } + + pub fn add(&mut self, task: AsyncWriterTask) { + self.wbs.push_back(task); + } +} + +pub trait HandleRaftReadyContext where WK: Mutable, - WR: RaftLogBatch, + ER: RaftEngine, { /// Returns the mutable references of WriteBatch for both KvDB and RaftDB in one interface. - fn wb_mut(&mut self) -> (&mut WK, Arc>>>); + fn wb_mut(&mut self) -> (&mut WK, Arc>>); fn kv_wb_mut(&mut self) -> &mut WK; //fn raft_wb_mut(&mut self) -> &mut WR; - fn raft_wb_pool(&mut self) -> Arc>>>; + fn raft_wb_pool(&mut self) -> Arc>>; fn sync_log(&self) -> bool; fn set_sync_log(&mut self, sync: bool); } @@ -1025,13 +1150,11 @@ where // to the return one. // WARNING: If this function returns error, the caller must panic otherwise the entry cache may // be wrong and break correctness. - pub fn append>( + pub fn append( &mut self, invoke_ctx: &mut InvokeContext, entries: Vec, - ready_ctx: &mut H, - ready_number: u64, - region_notifier: Arc, + raft_wb: &mut ER::LogBatch, ) -> Result { let region_id = self.get_region_id(); debug!( @@ -1059,17 +1182,10 @@ where cache.append(&self.tag, &entries); } - { - let raft_wb_pool = ready_ctx.raft_wb_pool(); - let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); - current.wb.append(region_id, entries)?; - current.unsynced_readies.insert(region_id, - UnsyncedReady{number: ready_number, region_id, notifier: region_notifier, version: 0}); - // Delete any previously appended log entries which never committed. - // TODO: Wrap it as an engine::Error. - current.wb.cut_logs(region_id, last_index + 1, prev_last_index); - } + raft_wb.append(region_id, entries)?; + // Delete any previously appended log entries which never committed. + // TODO: Wrap it as an engine::Error. + raft_wb.cut_logs(region_id, last_index + 1, prev_last_index); invoke_ctx.raft_state.set_last_index(last_index); invoke_ctx.last_term = last_term; @@ -1380,7 +1496,7 @@ where /// it explicitly to disk. If it's flushed to disk successfully, `post_ready` should be called /// to update the memory states properly. /// WARNING: If this function returns error, the caller must panic(details in `append` function). - pub fn handle_raft_ready>( + pub fn handle_raft_ready>( &mut self, ready_ctx: &mut H, ready: &mut Ready, @@ -1389,54 +1505,58 @@ where ) -> Result { let region_id = self.get_region_id(); let mut ctx = InvokeContext::new(self); - let snapshot_index = if ready.snapshot().is_empty() { - 0 - } else { - fail_point!("raft_before_apply_snap"); - let (kv_wb, raft_wb_pool) = ready_ctx.wb_mut(); - let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); - self.apply_snapshot(&mut ctx, ready.snapshot(), kv_wb, &mut current.wb, &destroy_regions)?; - current.unsynced_readies.insert(region_id, - UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); - fail_point!("raft_after_apply_snap"); + let mut snapshot_index = 0; - ctx.destroyed_regions = destroy_regions; - - last_index(&ctx.raft_state) - }; + { + let raft_wb_pool = ready_ctx.raft_wb_pool(); + let mut raft_wbs = raft_wb_pool.lock().unwrap(); + let current = if ready.must_sync() { + raft_wbs.prepare_current_for_write( + region_id, + ready.number(), + region_notifier.clone(), + ) + } else { + raft_wbs.current_mut() + }; - if !ready.entries().is_empty() { - self.append(&mut ctx, ready.take_entries(), ready_ctx, ready.number(), region_notifier.clone())?; - } + if !ready.snapshot().is_empty() { + fail_point!("raft_before_apply_snap"); + self.apply_snapshot(&mut ctx, ready.snapshot(), ready_ctx.kv_wb_mut(), &mut current.wb, &destroy_regions)?; + fail_point!("raft_after_apply_snap"); + ctx.destroyed_regions = destroy_regions; + snapshot_index = last_index(&ctx.raft_state); + }; - // Last index is 0 means the peer is created from raft message - // and has not applied snapshot yet, so skip persistent hard state. - if ctx.raft_state.get_last_index() > 0 { - if let Some(hs) = ready.hs() { - ctx.raft_state.set_hard_state(hs.clone()); + if !ready.entries().is_empty() { + self.append(&mut ctx, ready.take_entries(), &mut current.wb)?; + } + // Last index is 0 means the peer is created from raft message + // and has not applied snapshot yet, so skip persistent hard state. + if ctx.raft_state.get_last_index() > 0 { + if let Some(hs) = ready.hs() { + ctx.raft_state.set_hard_state(hs.clone()); + } } - } - // Save raft state if it has changed or there is a snapshot. - if ctx.raft_state != self.raft_state || snapshot_index > 0 { - { - let raft_wb_pool = ready_ctx.raft_wb_pool(); - let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); + // Save raft state if it has changed or there is a snapshot. + if ctx.raft_state != self.raft_state || snapshot_index > 0 { ctx.save_raft_state_to(&mut current.wb)?; } - if snapshot_index > 0 { - // in case of restart happen when we just write region state to Applying, - // but not write raft_local_state to raft rocksdb in time. - // we write raft state to default rocksdb, with last index set to snap index, - // in case of recv raft log after snapshot. - ctx.save_snapshot_raft_state_to(snapshot_index, ready_ctx.kv_wb_mut())?; + + if ready.must_sync() { + current.unsynced_readies.insert(region_id, + UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); } } // only when apply snapshot if snapshot_index > 0 { + // in case of restart happen when we just write region state to Applying, + // but not write raft_local_state to raft rocksdb in time. + // we write raft state to default rocksdb, with last index set to snap index, + // in case of recv raft log after snapshot. + ctx.save_snapshot_raft_state_to(snapshot_index, ready_ctx.kv_wb_mut())?; ctx.save_apply_state_to(ready_ctx.kv_wb_mut())?; } From ea07df10a4d1c6bb2bf2615b7557a00db707602d Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Wed, 6 Jan 2021 04:08:38 +0800 Subject: [PATCH 12/15] Added IO que and sample window config entries Signed-off-by: Liu Cong --- components/raftstore/src/lib.rs | 1 + components/raftstore/src/store/config.rs | 32 +++- components/raftstore/src/store/fsm/store.rs | 13 +- components/raftstore/src/store/metrics.rs | 6 + .../raftstore/src/store/peer_storage.rs | 177 ++++++++++++------ 5 files changed, 162 insertions(+), 67 deletions(-) diff --git a/components/raftstore/src/lib.rs b/components/raftstore/src/lib.rs index 846262bfd2d..3fcb5fa6378 100644 --- a/components/raftstore/src/lib.rs +++ b/components/raftstore/src/lib.rs @@ -8,6 +8,7 @@ #![feature(box_patterns)] #![feature(duration_saturating_ops)] #![feature(duration_zero)] +#![recursion_limit="256"] #[macro_use] extern crate bitflags; diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index be213c1e11e..9c4a5c07974 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -33,11 +33,15 @@ pub struct Config { #[config(skip)] pub store_io_min_interval_us: u64, #[config(skip)] - pub store_io_max_bytes: u64, - #[config(skip)] pub store_io_pool_size: u64, #[config(skip)] - pub store_io_queue: u64, + pub store_io_queue_size: u64, + #[config(skip)] + pub store_io_queue_init_bytes: u64, + #[config(skip)] + pub store_io_queue_bytes_step: f64, + #[config(skip)] + pub store_io_queue_sample_size: u64, #[config(skip)] pub apply_io_size: u64, // minimizes disruption when a partitioned node rejoins the cluster by using a two phase election. @@ -207,9 +211,11 @@ impl Default for Config { Config { delay_sync_us: 0, store_io_min_interval_us: 500, - store_io_max_bytes: 128 * 1024, store_io_pool_size: 2, - store_io_queue: 1, + store_io_queue_size: 64, + store_io_queue_init_bytes: 256 * 1024, + store_io_queue_bytes_step: 1.5, + store_io_queue_sample_size: 1024, apply_io_size: 0, prevote: true, raftdb_path: String::new(), @@ -439,15 +445,21 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_min_interval_us"]) .set((self.store_io_min_interval_us as i32).into()); - CONFIG_RAFTSTORE_GAUGE - .with_label_values(&["store_io_max_bytes"]) - .set((self.store_io_max_bytes as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_pool_size"]) .set((self.store_io_pool_size as i32).into()); CONFIG_RAFTSTORE_GAUGE - .with_label_values(&["store_io_queue"]) - .set((self.store_io_queue as i32).into()); + .with_label_values(&["store_io_queue_size"]) + .set((self.store_io_queue_size as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_init_bytes"]) + .set((self.store_io_queue_init_bytes as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_bytes_step"]) + .set((self.store_io_queue_bytes_step as f64).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_sample_size"]) + .set((self.store_io_queue_sample_size as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["apply_io_size"]) .set((self.apply_io_size as i32).into()); diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 8367c03f3c1..a4ca04faca2 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -128,10 +128,8 @@ where tag: String, pool_size: usize, io_min_interval_us: u64, - io_max_bytes: u64, + tasks: AsyncWriterTasks, ) -> AsyncWriter { - //let tasks = AsyncWriterTasks::new(engine.clone(), pool_size * 3, io_max_bytes as usize); - let tasks = AsyncWriterTasks::new(engine.clone(), pool_size + 1, io_max_bytes as usize); let mut async_writer = AsyncWriter { engine, router, @@ -1414,13 +1412,20 @@ impl RaftBatchSystem { cfg.value().delay_sync_enabled(), cfg.value().delay_sync_us as i64, ); + let async_writer_tasks = AsyncWriterTasks::new( + engines.raft.clone(), + (cfg.value().store_io_pool_size as usize) + 1, + cfg.value().store_io_queue_init_bytes as usize, + cfg.value().store_io_queue_bytes_step, + cfg.value().store_io_queue_sample_size as usize, + ); let async_writer = Arc::new(Mutex::new(AsyncWriter::new( engines.raft.clone(), self.router.clone(), "raftstore-async-writer".to_string(), cfg.value().store_io_pool_size as usize, cfg.value().store_io_min_interval_us, - cfg.value().store_io_max_bytes, + async_writer_tasks, ))); let mut builder = RaftPollerBuilder { cfg, diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 805c38869bd..e41fc1f213a 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -222,6 +222,12 @@ lazy_static! { "TODO", exponential_buckets(1.0, 2.0, 20).unwrap() ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_TASK_BYTES: Histogram = + register_histogram!( + "tikv_raftstore_store_task_bytes", + "TODO", + exponential_buckets(1024.0, 2.0, 30).unwrap() + ).unwrap(); pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index c3b1546606f..9fcd75e780a 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -317,6 +317,106 @@ where pub unsynced_readies: HashMap, } +impl AsyncWriterTask +where + WR: RaftLogBatch, +{ + pub fn on_written( + &mut self, + region_id: u64, + ready_number: u64, + region_notifier: Arc, + ) { + self.unsynced_readies.insert( + region_id, + UnsyncedReady { + number: ready_number, + region_id, + notifier: region_notifier, + version: 0, + }, + ); + } +} + +/* +pub struct AsyncWriterTasks +where + ER: RaftEngine, +{ + wbs: VecDeque>, +} + +impl AsyncWriterTasks +where + ER: RaftEngine, +{ + pub fn new( + engine: ER, + queue_size: usize, + _queue_init_bytes: usize, + _queue_bytes_step: f64, + _queue_sample_size: f64, + ) -> AsyncWriterTasks { + let mut wbs = VecDeque::default(); + for _ in 0..queue_size { + wbs.push_back(AsyncWriterTask { + wb: engine.log_batch(4 * 1024), + unsynced_readies: HashMap::default(), + }); + } + AsyncWriterTasks { + wbs, + } + } + + pub fn prepare_current_for_write(&mut self) -> &mut AsyncWriterTask { + self.wbs.front_mut().unwrap() + } + + pub fn no_task(&self) -> bool { + self.wbs.front().unwrap().unsynced_readies.is_empty() + } + + pub fn detach_task(&mut self) -> AsyncWriterTask { + self.wbs.pop_front().unwrap() + } + + pub fn add(&mut self, task: AsyncWriterTask) { + self.wbs.push_back(task); + } +} +*/ + +pub struct SampleWindow { + que: VecDeque, + size: usize, + sum: f64, + recal_counter: usize, +} + +impl SampleWindow { + pub fn new(size: usize) -> SampleWindow { + SampleWindow { que: VecDeque::default(), size, sum: 0.0, recal_counter: 0 } + } + + pub fn observe_and_get_avg(&mut self, value: f64) -> f64 { + self.que.push_back(value); + if self.que.len() > self.size { + let old = self.que.pop_front().unwrap(); + self.sum = self.sum + value - old; + } else { + self.sum += value; + } + self.recal_counter += 1; + if self.recal_counter > self.size * 1000 { + self.recal_counter = 0; + self.sum = self.que.iter().sum(); + } + self.sum / (self.que.len() as f64) + } +} + pub struct AsyncWriterTasks where ER: RaftEngine, @@ -324,9 +424,8 @@ where wbs: VecDeque>, size_limits: Vec, current_idx: usize, - sample_window: VecDeque, - sample_window_size: usize, adaptive_idx: usize, + sample_window: SampleWindow, } // TODO: make sure the queue size is good when doing pop/front @@ -334,7 +433,13 @@ impl AsyncWriterTasks where ER: RaftEngine, { - pub fn new(engine: ER, queue_size: usize, task_soft_max_bytes: usize) -> AsyncWriterTasks { + pub fn new( + engine: ER, + queue_size: usize, + queue_init_bytes: usize, + queue_bytes_step: f64, + queue_sample_size: usize, + ) -> AsyncWriterTasks { let mut wbs = VecDeque::default(); for _ in 0..queue_size { wbs.push_back(AsyncWriterTask { @@ -343,27 +448,26 @@ where }); } let mut size_limits = vec![]; - let mut size_limit = task_soft_max_bytes; - for _ in 0..(queue_size * 10) { + let mut size_limit = queue_init_bytes; + for _ in 0..(queue_size * 2) { size_limits.push(size_limit); - size_limit = (size_limit as f64 * 1.2) as usize; + size_limit = (size_limit as f64 * queue_bytes_step) as usize; } AsyncWriterTasks { wbs, size_limits, current_idx: 0, - sample_window: VecDeque::default(), - sample_window_size: 4, adaptive_idx: 0, + sample_window: SampleWindow::new(queue_sample_size), } } - pub fn prepare_current_for_write( - &mut self, - region_id: u64, - ready_number: u64, - region_notifier: Arc, - ) -> &mut AsyncWriterTask { + pub fn current(&self) -> &AsyncWriterTask { + assert!(self.current_idx <= self.wbs.len()); + &self.wbs[self.current_idx] + } + + pub fn prepare_current_for_write(&mut self) -> &mut AsyncWriterTask { assert!(self.current_idx <= self.wbs.len()); let current_size = { self.wbs[self.current_idx].wb.persist_size() @@ -376,27 +480,6 @@ where // do nothing, adaptive IO size } } - let current = &mut self.wbs[self.current_idx]; - - current.unsynced_readies.insert( - region_id, - UnsyncedReady { - number: ready_number, - region_id, - notifier: region_notifier, - version: 0, - }, - ); - current - } - - pub fn current(&self) -> &AsyncWriterTask { - assert!(self.current_idx <= self.wbs.len()); - &self.wbs[self.current_idx] - } - - pub fn current_mut(&mut self) -> &mut AsyncWriterTask { - assert!(self.current_idx <= self.wbs.len()); &mut self.wbs[self.current_idx] } @@ -411,17 +494,13 @@ where pub fn detach_task(&mut self) -> AsyncWriterTask { RAFT_ASYNC_WRITER_QUEUE_SIZE.observe(self.current_idx as f64); RAFT_ASYNC_WRITER_ADAPTIVE_IDX.observe(self.adaptive_idx as f64); - let task = self.wbs.pop_front().unwrap(); let task_bytes = task.wb.persist_size(); + RAFT_ASYNC_WRITER_TASK_BYTES.observe(task_bytes as f64); - self.sample_window.push_back(task_bytes); - if self.sample_window.len() > self.sample_window_size { - self.sample_window.pop_front(); - } + let task_avg_bytes = self.sample_window.observe_and_get_avg(task_bytes as f64); let target_bytes = self.size_limits[self.adaptive_idx + self.current_idx]; - let task_avg_bytes = (self.sample_window.iter().sum::() as f64 / self.sample_window.len() as f64) as usize; - if task_avg_bytes >= target_bytes { + if task_avg_bytes >= (target_bytes as f64) { if self.adaptive_idx + 1 + self.wbs.len() - 1 < self.size_limits.len() { self.adaptive_idx += 1; } @@ -442,6 +521,7 @@ where } } + pub trait HandleRaftReadyContext where WK: Mutable, @@ -1510,15 +1590,7 @@ where { let raft_wb_pool = ready_ctx.raft_wb_pool(); let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = if ready.must_sync() { - raft_wbs.prepare_current_for_write( - region_id, - ready.number(), - region_notifier.clone(), - ) - } else { - raft_wbs.current_mut() - }; + let current = raft_wbs.prepare_current_for_write(); if !ready.snapshot().is_empty() { fail_point!("raft_before_apply_snap"); @@ -1545,8 +1617,7 @@ where } if ready.must_sync() { - current.unsynced_readies.insert(region_id, - UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); + current.on_written(region_id, ready.number(), region_notifier.clone()); } } From dcbe442560b609394a6afe4c07e9653dcdce039d Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Wed, 6 Jan 2021 20:30:37 +0800 Subject: [PATCH 13/15] Fixed corruption Signed-off-by: Liu Cong --- components/raftstore/src/store/fsm/store.rs | 1 + components/raftstore/src/store/metrics.rs | 8 +++- .../raftstore/src/store/peer_storage.rs | 48 ++++++++++++------- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index a4ca04faca2..8cfd99ff218 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -161,6 +161,7 @@ where let task = { let mut tasks = x.tasks.lock().unwrap(); if tasks.no_task() { + // TODO: block and wait for new data arrive continue; } tasks.detach_task() diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index e41fc1f213a..31a67516351 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -226,7 +226,13 @@ lazy_static! { register_histogram!( "tikv_raftstore_store_task_bytes", "TODO", - exponential_buckets(1024.0, 2.0, 30).unwrap() + exponential_buckets(256.0, 2.0, 30).unwrap() + ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_TASK_LIMIT_BYTES: Histogram = + register_histogram!( + "tikv_raftstore_store_task_limit_bytes", + "TODO", + exponential_buckets(256.0, 2.0, 30).unwrap() ).unwrap(); diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 9fcd75e780a..4f57ddbb8eb 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -337,6 +337,10 @@ where }, ); } + + pub fn is_empty(&self) -> bool { + self.unsynced_readies.is_empty() + } } /* @@ -462,20 +466,16 @@ where } } - pub fn current(&self) -> &AsyncWriterTask { - assert!(self.current_idx <= self.wbs.len()); - &self.wbs[self.current_idx] - } - pub fn prepare_current_for_write(&mut self) -> &mut AsyncWriterTask { assert!(self.current_idx <= self.wbs.len()); - let current_size = { - self.wbs[self.current_idx].wb.persist_size() - }; + if self.current_idx + 1 < self.wbs.len() { + assert!(self.wbs[self.current_idx + 1].is_empty()); + } + let current_size = self.wbs[self.current_idx].wb.persist_size(); if current_size >= self.size_limits[self.adaptive_idx + self.current_idx] { if self.current_idx + 1 < self.wbs.len() { self.current_idx += 1; - assert!(self.wbs[self.current_idx].unsynced_readies.len() == 0); + assert!(self.wbs[self.current_idx].is_empty()); } else { // do nothing, adaptive IO size } @@ -484,7 +484,7 @@ where } pub fn no_task(&self) -> bool { - let no_task = self.wbs.front().unwrap().unsynced_readies.is_empty(); + let no_task = self.wbs.front().unwrap().is_empty(); if no_task { assert!(self.current_idx == 0); } @@ -494,20 +494,30 @@ where pub fn detach_task(&mut self) -> AsyncWriterTask { RAFT_ASYNC_WRITER_QUEUE_SIZE.observe(self.current_idx as f64); RAFT_ASYNC_WRITER_ADAPTIVE_IDX.observe(self.adaptive_idx as f64); + + assert!(self.current_idx <= self.wbs.len()); + if self.current_idx + 1 < self.wbs.len() { + assert!(self.wbs[self.current_idx + 1].is_empty()); + } + if self.current_idx != 0 { + assert!(!self.wbs[self.current_idx - 1].is_empty()); + } let task = self.wbs.pop_front().unwrap(); + assert!(!task.is_empty()); + let task_bytes = task.wb.persist_size(); RAFT_ASYNC_WRITER_TASK_BYTES.observe(task_bytes as f64); let task_avg_bytes = self.sample_window.observe_and_get_avg(task_bytes as f64); - let target_bytes = self.size_limits[self.adaptive_idx + self.current_idx]; - if task_avg_bytes >= (target_bytes as f64) { - if self.adaptive_idx + 1 + self.wbs.len() - 1 < self.size_limits.len() { + let target_bytes = self.size_limits[self.adaptive_idx + self.current_idx] as f64; + RAFT_ASYNC_WRITER_TASK_LIMIT_BYTES.observe(target_bytes); + if task_avg_bytes >= target_bytes { + if self.adaptive_idx + (self.wbs.len() - 1) + 1 < self.size_limits.len() { self.adaptive_idx += 1; } - } else if (task_avg_bytes as f64) < ((target_bytes as f64) / 1.25) { - if self.adaptive_idx > 0 { - self.adaptive_idx -= 1; - } + } else if self.adaptive_idx > 0 && + task_avg_bytes < (self.size_limits[self.adaptive_idx + self.current_idx - 1] as f64) { + self.adaptive_idx -= 1; } if self.current_idx != 0 { @@ -517,11 +527,11 @@ where } pub fn add(&mut self, task: AsyncWriterTask) { + assert!(task.is_empty()); self.wbs.push_back(task); } } - pub trait HandleRaftReadyContext where WK: Mutable, @@ -1591,6 +1601,7 @@ where let raft_wb_pool = ready_ctx.raft_wb_pool(); let mut raft_wbs = raft_wb_pool.lock().unwrap(); let current = raft_wbs.prepare_current_for_write(); + let current_size = current.wb.persist_size(); if !ready.snapshot().is_empty() { fail_point!("raft_before_apply_snap"); @@ -1617,6 +1628,7 @@ where } if ready.must_sync() { + assert!(current_size != current.wb.persist_size()); current.on_written(region_id, ready.number(), region_notifier.clone()); } } From 7f0b50e36be6db637c506ba94b2088a26087288c Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Wed, 6 Jan 2021 20:48:38 +0800 Subject: [PATCH 14/15] Remove sample recal Signed-off-by: Liu Cong --- components/raftstore/src/store/peer_storage.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 4f57ddbb8eb..f03be9e2c2a 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -396,12 +396,11 @@ pub struct SampleWindow { que: VecDeque, size: usize, sum: f64, - recal_counter: usize, } impl SampleWindow { pub fn new(size: usize) -> SampleWindow { - SampleWindow { que: VecDeque::default(), size, sum: 0.0, recal_counter: 0 } + SampleWindow { que: VecDeque::default(), size, sum: 0.0 } } pub fn observe_and_get_avg(&mut self, value: f64) -> f64 { @@ -412,11 +411,6 @@ impl SampleWindow { } else { self.sum += value; } - self.recal_counter += 1; - if self.recal_counter > self.size * 1000 { - self.recal_counter = 0; - self.sum = self.que.iter().sum(); - } self.sum / (self.que.len() as f64) } } From dbd72e38c55a2edb757a80ff5cc73c8487f7a28d Mon Sep 17 00:00:00 2001 From: Liu Cong Date: Thu, 7 Jan 2021 07:19:15 +0800 Subject: [PATCH 15/15] Adaptive Store IO size Signed-off-by: Liu Cong --- components/raftstore/src/store/config.rs | 18 ++-- components/raftstore/src/store/fsm/store.rs | 5 +- components/raftstore/src/store/metrics.rs | 7 +- .../raftstore/src/store/peer_storage.rs | 96 +++++++++++++++---- 4 files changed, 96 insertions(+), 30 deletions(-) diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index 9c4a5c07974..7bb5bf8f5fc 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -41,7 +41,9 @@ pub struct Config { #[config(skip)] pub store_io_queue_bytes_step: f64, #[config(skip)] - pub store_io_queue_sample_size: u64, + pub store_io_queue_sample_quantile: f64, + #[config(skip)] + pub store_io_queue_adaptive_gain: u64, #[config(skip)] pub apply_io_size: u64, // minimizes disruption when a partitioned node rejoins the cluster by using a two phase election. @@ -210,12 +212,13 @@ impl Default for Config { let split_size = ReadableSize::mb(coprocessor::config::SPLIT_SIZE_MB); Config { delay_sync_us: 0, - store_io_min_interval_us: 500, + store_io_min_interval_us: 300, store_io_pool_size: 2, store_io_queue_size: 64, store_io_queue_init_bytes: 256 * 1024, - store_io_queue_bytes_step: 1.5, - store_io_queue_sample_size: 1024, + store_io_queue_bytes_step: 1.414213562373095, + store_io_queue_adaptive_gain: 0, + store_io_queue_sample_quantile: 0.9, apply_io_size: 0, prevote: true, raftdb_path: String::new(), @@ -458,8 +461,11 @@ impl Config { .with_label_values(&["store_io_queue_bytes_step"]) .set((self.store_io_queue_bytes_step as f64).into()); CONFIG_RAFTSTORE_GAUGE - .with_label_values(&["store_io_queue_sample_size"]) - .set((self.store_io_queue_sample_size as i32).into()); + .with_label_values(&["store_io_queue_sample_quantile"]) + .set((self.store_io_queue_sample_quantile as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_queue_adaptive_gain"]) + .set((self.store_io_queue_adaptive_gain as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["apply_io_size"]) .set((self.apply_io_size as i32).into()); diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 8cfd99ff218..85494c35c02 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1415,10 +1415,11 @@ impl RaftBatchSystem { ); let async_writer_tasks = AsyncWriterTasks::new( engines.raft.clone(), - (cfg.value().store_io_pool_size as usize) + 1, + (cfg.value().store_io_queue_size as usize) + 1, cfg.value().store_io_queue_init_bytes as usize, cfg.value().store_io_queue_bytes_step, - cfg.value().store_io_queue_sample_size as usize, + cfg.value().store_io_queue_adaptive_gain as usize, + cfg.value().store_io_queue_sample_quantile, ); let async_writer = Arc::new(Mutex::new(AsyncWriter::new( engines.raft.clone(), diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 31a67516351..561a3797012 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -234,7 +234,12 @@ lazy_static! { "TODO", exponential_buckets(256.0, 2.0, 30).unwrap() ).unwrap(); - + pub static ref RAFT_ASYNC_WRITER_TASK_SUGGEST_BYTES: Histogram = + register_histogram!( + "tikv_raftstore_store_task_suggest_bytes", + "TODO", + exponential_buckets(256.0, 2.0, 30).unwrap() + ).unwrap(); pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index f03be9e2c2a..664135da75a 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -393,25 +393,67 @@ where */ pub struct SampleWindow { - que: VecDeque, - size: usize, - sum: f64, + count: usize, + buckets: VecDeque, + buckets_val_cnt: VecDeque, + bucket_factor: f64, } impl SampleWindow { - pub fn new(size: usize) -> SampleWindow { - SampleWindow { que: VecDeque::default(), size, sum: 0.0 } + pub fn new() -> SampleWindow { + SampleWindow { + count: 0, + buckets: VecDeque::default(), + buckets_val_cnt: VecDeque::default(), + bucket_factor: 2.0, + } } - pub fn observe_and_get_avg(&mut self, value: f64) -> f64 { - self.que.push_back(value); - if self.que.len() > self.size { - let old = self.que.pop_front().unwrap(); - self.sum = self.sum + value - old; + pub fn observe(&mut self, value: f64) { + // For P99, P999 + self.count += 1; + if self.buckets.is_empty() { + self.buckets.push_back(value); + self.buckets_val_cnt.push_back(0); } else { - self.sum += value; + let mut bucket_pos = self.buckets.len() / 2; + loop { + let bucket_val = self.buckets[bucket_pos]; + if value < bucket_val { + if bucket_pos == 0 { + self.buckets.push_front(bucket_val / self.bucket_factor); + self.buckets_val_cnt.push_front(0); + } else { + bucket_pos -= 1; + } + continue; + } + let bucket_val_ub = bucket_val * self.bucket_factor; + if value < bucket_val_ub { + break; + } + if bucket_pos + 1 >= self.buckets.len() { + self.buckets.push_back(bucket_val_ub); + self.buckets_val_cnt.push_back(0); + } + bucket_pos += 1; + } + self.buckets_val_cnt[bucket_pos] += 1; } - self.sum / (self.que.len() as f64) + } + + pub fn quantile(&mut self, quantile: f64) -> f64 { + let mut cnt_sum = 0; + let mut index = self.buckets_val_cnt.len() - 1; + let sum_target = (self.count as f64 * quantile) as usize; + for i in 0..self.buckets_val_cnt.len() { + cnt_sum += self.buckets_val_cnt[i]; + if cnt_sum >= sum_target { + index = i; + break; + } + } + self.buckets[index] * self.bucket_factor } } @@ -423,7 +465,9 @@ where size_limits: Vec, current_idx: usize, adaptive_idx: usize, + adaptive_gain: usize, sample_window: SampleWindow, + sample_quantile: f64, } // TODO: make sure the queue size is good when doing pop/front @@ -436,7 +480,8 @@ where queue_size: usize, queue_init_bytes: usize, queue_bytes_step: f64, - queue_sample_size: usize, + queue_adaptive_gain: usize, + queue_sample_quantile: f64, ) -> AsyncWriterTasks { let mut wbs = VecDeque::default(); for _ in 0..queue_size { @@ -447,7 +492,7 @@ where } let mut size_limits = vec![]; let mut size_limit = queue_init_bytes; - for _ in 0..(queue_size * 2) { + for _ in 0..(queue_size * 2 + queue_adaptive_gain) { size_limits.push(size_limit); size_limit = (size_limit as f64 * queue_bytes_step) as usize; } @@ -456,7 +501,9 @@ where size_limits, current_idx: 0, adaptive_idx: 0, - sample_window: SampleWindow::new(queue_sample_size), + adaptive_gain: queue_adaptive_gain, + sample_window: SampleWindow::new(), + sample_quantile: queue_sample_quantile, } } @@ -466,7 +513,8 @@ where assert!(self.wbs[self.current_idx + 1].is_empty()); } let current_size = self.wbs[self.current_idx].wb.persist_size(); - if current_size >= self.size_limits[self.adaptive_idx + self.current_idx] { + assert!(self.adaptive_gain + self.adaptive_idx + self.current_idx < self.size_limits.len()); + if current_size >= self.size_limits[self.adaptive_gain + self.adaptive_idx + self.current_idx] { if self.current_idx + 1 < self.wbs.len() { self.current_idx += 1; assert!(self.wbs[self.current_idx].is_empty()); @@ -502,15 +550,21 @@ where let task_bytes = task.wb.persist_size(); RAFT_ASYNC_WRITER_TASK_BYTES.observe(task_bytes as f64); - let task_avg_bytes = self.sample_window.observe_and_get_avg(task_bytes as f64); - let target_bytes = self.size_limits[self.adaptive_idx + self.current_idx] as f64; - RAFT_ASYNC_WRITER_TASK_LIMIT_BYTES.observe(target_bytes); - if task_avg_bytes >= target_bytes { + assert!(self.adaptive_gain + self.adaptive_idx + self.current_idx < self.size_limits.len()); + RAFT_ASYNC_WRITER_TASK_LIMIT_BYTES.observe( + self.size_limits[self.adaptive_gain + self.adaptive_idx + self.current_idx] as f64); + + self.sample_window.observe(task_bytes as f64); + let task_suggest_bytes = self.sample_window.quantile(self.sample_quantile); + RAFT_ASYNC_WRITER_TASK_SUGGEST_BYTES.observe(task_suggest_bytes); + + let current_target_bytes = self.size_limits[self.adaptive_idx + self.current_idx] as f64; + if task_suggest_bytes >= current_target_bytes { if self.adaptive_idx + (self.wbs.len() - 1) + 1 < self.size_limits.len() { self.adaptive_idx += 1; } } else if self.adaptive_idx > 0 && - task_avg_bytes < (self.size_limits[self.adaptive_idx + self.current_idx - 1] as f64) { + task_suggest_bytes < (self.size_limits[self.adaptive_idx + self.current_idx - 1] as f64) { self.adaptive_idx -= 1; }