diff --git a/components/engine_panic/src/raft_engine.rs b/components/engine_panic/src/raft_engine.rs index ff9f3d433387..4cc1822303d7 100644 --- a/components/engine_panic/src/raft_engine.rs +++ b/components/engine_panic/src/raft_engine.rs @@ -105,6 +105,10 @@ impl RaftLogBatch for PanicWriteBatch { panic!() } + fn persist_size(&self) -> usize { + panic!() + } + fn is_empty(&self) -> bool { panic!() } diff --git a/components/engine_rocks/src/raft_engine.rs b/components/engine_rocks/src/raft_engine.rs index f443e01709af..c5ad0cda8cc9 100644 --- a/components/engine_rocks/src/raft_engine.rs +++ b/components/engine_rocks/src/raft_engine.rs @@ -239,6 +239,10 @@ impl RaftLogBatch for RocksWriteBatch { self.put_msg(&keys::raft_state_key(raft_group_id), state) } + fn persist_size(&self) -> usize { + self.data_size() + } + fn is_empty(&self) -> bool { Mutable::is_empty(self) } diff --git a/components/engine_traits/src/raft_engine.rs b/components/engine_traits/src/raft_engine.rs index 68dc66285d18..3b9dac1bd772 100644 --- a/components/engine_traits/src/raft_engine.rs +++ b/components/engine_traits/src/raft_engine.rs @@ -87,6 +87,8 @@ pub trait RaftLogBatch: Send { fn put_raft_state(&mut self, raft_group_id: u64, state: &RaftLocalState) -> Result<()>; + fn persist_size(&self) -> usize; + fn is_empty(&self) -> bool; } diff --git a/components/raft_log_engine/src/engine.rs b/components/raft_log_engine/src/engine.rs index a77249d643a6..e6cb5bcc3be2 100644 --- a/components/raft_log_engine/src/engine.rs +++ b/components/raft_log_engine/src/engine.rs @@ -59,6 +59,10 @@ impl RaftLogBatchTrait for RaftLogBatch { Ok(()) } + fn persist_size(&self) -> usize { + panic!("not impl!") + } + fn is_empty(&self) -> bool { self.0.items.is_empty() } diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index b27ec2941c4b..be213c1e11ec 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -33,6 +33,8 @@ pub struct Config { #[config(skip)] pub store_io_min_interval_us: u64, #[config(skip)] + pub store_io_max_bytes: u64, + #[config(skip)] pub store_io_pool_size: u64, #[config(skip)] pub store_io_queue: u64, @@ -205,6 +207,7 @@ impl Default for Config { Config { delay_sync_us: 0, store_io_min_interval_us: 500, + store_io_max_bytes: 128 * 1024, store_io_pool_size: 2, store_io_queue: 1, apply_io_size: 0, @@ -436,6 +439,9 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_min_interval_us"]) .set((self.store_io_min_interval_us as i32).into()); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["store_io_max_bytes"]) + .set((self.store_io_max_bytes as i32).into()); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["store_io_pool_size"]) .set((self.store_io_pool_size as i32).into()); diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index ee8f89ef14a2..e09fe3502e22 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -925,9 +925,11 @@ where if !data.is_empty() { let cmd = util::parse_data_at(data, index, &self.tag); - if should_write_to_engine(&cmd) || - (apply_ctx.apply_io_size != 0 && (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size)) || - (apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine()) { + if should_write_to_engine(&cmd) + || (apply_ctx.apply_io_size != 0 + && (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size)) + || (apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine()) + { apply_ctx.commit(self); if let Some(start) = self.handle_start.as_ref() { if start.elapsed() >= apply_ctx.yield_duration { diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 409b8dcf6d02..9df705c0efc6 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1,5 +1,3 @@ -// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. - use std::cmp::{Ord, Ordering as CmpOrdering}; use std::collections::BTreeMap; use std::collections::Bound::{Excluded, Included, Unbounded}; @@ -83,6 +81,7 @@ pub const PENDING_MSG_CAP: usize = 100; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); use crate::store::peer_storage::AsyncWriterTask; +use crate::store::peer_storage::AsyncWriterTasks; use std::collections::VecDeque; use std::thread::JoinHandle; @@ -95,7 +94,7 @@ where router: RaftRouter, tag: String, io_min_interval: Duration, - tasks: Arc>>>, + tasks: Arc>>, workers: Arc>>>, } @@ -105,7 +104,7 @@ where ER: RaftEngine, { fn clone(&self) -> Self { - AsyncWriter{ + AsyncWriter { engine: self.engine.clone(), router: self.router.clone(), tag: self.tag.clone(), @@ -121,18 +120,16 @@ where EK: KvEngine, ER: RaftEngine, { - pub fn new(engine: ER, router: RaftRouter, tag: String, - pool_size: usize, io_min_interval_us: u64) -> AsyncWriter { - let mut tasks = VecDeque::default(); - for _ in 0..(pool_size + 1) { - tasks.push_back( - AsyncWriterTask { - wb: engine.log_batch(4 * 1024), - unsynced_readies: HashMap::default(), - } - ); - } - let mut async_writer = AsyncWriter{ + pub fn new( + engine: ER, + router: RaftRouter, + tag: String, + pool_size: usize, + io_min_interval_us: u64, + io_max_bytes: u64, + ) -> AsyncWriter { + let tasks = AsyncWriterTasks::new(engine.clone(), pool_size * 3, io_max_bytes as usize); + let mut async_writer = AsyncWriter { engine, router, tag, @@ -160,20 +157,23 @@ where } last_ts = now_ts; - // TODO: block if too many data in current raft_wb let task = { let mut tasks = x.tasks.lock().unwrap(); - if tasks.front().unwrap().unsynced_readies.is_empty() { + if tasks.no_task() { continue; } - tasks.pop_front().unwrap() + tasks.detach_task() }; let wb = x.sync_write(task.wb, task.unsynced_readies); + // TODO: block if too many tasks { let mut tasks = x.tasks.lock().unwrap(); - tasks.push_back(AsyncWriterTask{wb, unsynced_readies: HashMap::default()}); + tasks.add(AsyncWriterTask { + wb, + unsynced_readies: HashMap::default(), + }); } STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM @@ -186,12 +186,16 @@ where } } - pub fn raft_wb_pool(&mut self) -> Arc>>> { + pub fn raft_wb_pool(&mut self) -> Arc>> { self.tasks.clone() } // TODO: this func is assumed in tasks.locked status - pub fn sync_write(&mut self, mut wb: ER::LogBatch, mut unsynced_readies: HashMap) -> ER::LogBatch { + pub fn sync_write( + &mut self, + mut wb: ER::LogBatch, + mut unsynced_readies: HashMap, + ) -> ER::LogBatch { let now = TiInstant::now_coarse(); self.engine .consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024) @@ -224,7 +228,10 @@ where if pre_number >= r.number { break; } - if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) { + if pre_number + == r.notifier + .compare_and_swap(pre_number, r.number, Ordering::AcqRel) + { if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) { error!( "failed to send noop to trigger persisted ready"; @@ -500,12 +507,12 @@ where pub async_writer: Arc>>, } -impl HandleRaftReadyContext for PollContext +impl HandleRaftReadyContext for PollContext where EK: KvEngine, ER: RaftEngine, { - fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc::>>>) { + fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc>>) { self.raft_wb_is_empty = false; let mut async_writer = self.async_writer.lock().unwrap(); (&mut self.kv_wb, async_writer.raft_wb_pool()) @@ -517,7 +524,7 @@ where } #[inline] - fn raft_wb_pool(&mut self) -> Arc>>> { + fn raft_wb_pool(&mut self) -> Arc>> { self.raft_wb_is_empty = false; let mut async_writer = self.async_writer.lock().unwrap(); async_writer.raft_wb_pool() @@ -821,8 +828,7 @@ impl RaftPoller { } } fail_point!("raft_between_save"); - STORE_WRITE_KVDB_DURATION_HISTOGRAM - .observe(duration_to_sec(now.elapsed()) as f64); + STORE_WRITE_KVDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64); let raft_wb_is_empty = self.poll_ctx.raft_wb_is_empty; if !raft_wb_is_empty { @@ -933,8 +939,7 @@ impl PollHandler, St self.poll_ctx.sync_log = false; self.poll_ctx.has_ready = false; self.timer = TiInstant::now_coarse(); - STORE_LOOP_DURATION_HISTOGRAM - .observe(duration_to_sec(self.loop_timer.elapsed()) as f64); + STORE_LOOP_DURATION_HISTOGRAM.observe(duration_to_sec(self.loop_timer.elapsed()) as f64); self.loop_timer = TiInstant::now_coarse(); // update config self.poll_ctx.perf_context_statistics.start(); @@ -1406,9 +1411,14 @@ impl RaftBatchSystem { cfg.value().delay_sync_enabled(), cfg.value().delay_sync_us as i64, ); - let async_writer = Arc::new(Mutex::new(AsyncWriter::new(engines.raft.clone(), - self.router.clone(), "raftstore-async-writer".to_string(), - cfg.value().store_io_pool_size as usize, cfg.value().store_io_min_interval_us))); + let async_writer = Arc::new(Mutex::new(AsyncWriter::new( + engines.raft.clone(), + self.router.clone(), + "raftstore-async-writer".to_string(), + cfg.value().store_io_pool_size as usize, + cfg.value().store_io_min_interval_us, + cfg.value().store_io_max_bytes, + ))); let mut builder = RaftPollerBuilder { cfg, store: meta, diff --git a/components/raftstore/src/store/fsm/sync_policy.rs b/components/raftstore/src/store/fsm/sync_policy.rs index 40154df1fea2..e0e997fd7663 100644 --- a/components/raftstore/src/store/fsm/sync_policy.rs +++ b/components/raftstore/src/store/fsm/sync_policy.rs @@ -1,10 +1,10 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. use std::collections::VecDeque; +use std::mem; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::Arc; -use std::mem; use crossbeam::utils::CachePadded; use engine_traits::KvEngine; diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 5f4db195caf9..805c38869bd1 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -180,12 +180,6 @@ make_auto_flush_static_metric! { } lazy_static! { - pub static ref ASYNC_WRITER_IO_QUEUE_VEC: IntGaugeVec = - register_int_gauge_vec!( - "tikv_raftstore_async_writer_io_queue_total", - "Current pending + running io tasks.", - &["name"] - ).unwrap(); pub static ref STORE_LOOP_DURATION_HISTOGRAM: Histogram = register_histogram!( "tikv_raftstore_store_loop_duration_seconds", @@ -222,6 +216,13 @@ lazy_static! { "TODO", exponential_buckets(1.0, 2.0, 20).unwrap() ).unwrap(); + pub static ref RAFT_ASYNC_WRITER_ADAPTIVE_IDX: Histogram = + register_histogram!( + "tikv_raftstore_store_adaptive_idx", + "TODO", + exponential_buckets(1.0, 2.0, 20).unwrap() + ).unwrap(); + pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index cb495e8c76c8..178bc8d98a21 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -1686,17 +1686,18 @@ where } let notifier = self.persisted_notifier.clone(); - let invoke_ctx = match self - .mut_store() - .handle_raft_ready(ctx, &mut ready, destroy_regions, notifier) - { - Ok(r) => r, - Err(e) => { - // We may have written something to writebatch and it can't be reverted, so has - // to panic here. - panic!("{} failed to handle raft ready: {:?}", self.tag, e) - } - }; + let invoke_ctx = + match self + .mut_store() + .handle_raft_ready(ctx, &mut ready, destroy_regions, notifier) + { + Ok(r) => r, + Err(e) => { + // We may have written something to writebatch and it can't be reverted, so has + // to panic here. + panic!("{} failed to handle raft ready: {:?}", self.tag, e) + } + }; Some((ready, invoke_ctx)) } diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 2a02fcc17b8a..77d9f9b1b531 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -25,16 +25,16 @@ use crate::store::ProposalContext; use crate::{Error, Result}; use engine_traits::{RaftEngine, RaftLogBatch}; use into_other::into_other; -use tikv_util::worker::Scheduler; use std::sync::Mutex; +use tikv_util::worker::Scheduler; use super::metrics::*; use super::worker::RegionTask; use super::{SnapEntry, SnapKey, SnapManager, SnapshotStatistics}; -use tikv_util::collections::HashMap; use crate::store::fsm::sync_policy::UnsyncedReady; use std::sync::atomic::AtomicU64; +use tikv_util::collections::HashMap; // When we create a region peer, we should initialize its log term/index > 0, // so that we can force the follower peer to sync the snapshot first. @@ -317,16 +317,141 @@ where pub unsynced_readies: HashMap, } -pub trait HandleRaftReadyContext +pub struct AsyncWriterTasks +where + ER: RaftEngine, +{ + wbs: VecDeque>, + size_limits: Vec, + current_idx: usize, + sample_window: VecDeque, + sample_window_size: usize, + adaptive_idx: usize, +} + +// TODO: make sure the queue size is good when doing pop/front +impl AsyncWriterTasks +where + ER: RaftEngine, +{ + pub fn new(engine: ER, queue_size: usize, task_soft_max_bytes: usize) -> AsyncWriterTasks { + let mut wbs = VecDeque::default(); + for _ in 0..queue_size { + wbs.push_back(AsyncWriterTask { + wb: engine.log_batch(4 * 1024), + unsynced_readies: HashMap::default(), + }); + } + let mut size_limits = vec![]; + let mut size_limit = task_soft_max_bytes; + for _ in 0..(queue_size * 10) { + size_limits.push(size_limit); + size_limit = (size_limit as f64 * 1.2) as usize; + } + AsyncWriterTasks { + wbs, + size_limits, + current_idx: 0, + sample_window: VecDeque::default(), + sample_window_size: 4, + adaptive_idx: 0, + } + } + + pub fn prepare_current_for_write( + &mut self, + region_id: u64, + ready_number: u64, + region_notifier: Arc, + ) -> &mut AsyncWriterTask { + assert!(self.current_idx <= self.wbs.len()); + let current_size = { + self.wbs[self.current_idx].wb.persist_size() + }; + if current_size >= self.size_limits[self.adaptive_idx + self.current_idx] { + if self.current_idx + 1 < self.wbs.len() { + self.current_idx += 1; + assert!(self.wbs[self.current_idx].unsynced_readies.len() == 0); + } else { + // do nothing, adaptive IO size + } + } + let current = &mut self.wbs[self.current_idx]; + + current.unsynced_readies.insert( + region_id, + UnsyncedReady { + number: ready_number, + region_id, + notifier: region_notifier, + version: 0, + }, + ); + current + } + + pub fn current(&self) -> &AsyncWriterTask { + assert!(self.current_idx <= self.wbs.len()); + &self.wbs[self.current_idx] + } + + pub fn current_mut(&mut self) -> &mut AsyncWriterTask { + assert!(self.current_idx <= self.wbs.len()); + &mut self.wbs[self.current_idx] + } + + pub fn no_task(&self) -> bool { + let no_task = self.wbs.front().unwrap().unsynced_readies.is_empty(); + if no_task { + assert!(self.current_idx == 0); + } + no_task + } + + pub fn detach_task(&mut self) -> AsyncWriterTask { + RAFT_ASYNC_WRITER_QUEUE_SIZE.observe(self.current_idx as f64); + RAFT_ASYNC_WRITER_ADAPTIVE_IDX.observe(self.adaptive_idx as f64); + + let task = self.wbs.pop_front().unwrap(); + let task_bytes = task.wb.persist_size(); + + self.sample_window.push_back(task_bytes); + if self.sample_window.len() > self.sample_window_size { + self.sample_window.pop_front(); + } + let target_bytes = self.size_limits[self.adaptive_idx + self.current_idx]; + let task_avg_bytes = (self.sample_window.iter().sum::() as f64 / self.sample_window.len() as f64) as usize; + if task_avg_bytes >= target_bytes { + if self.adaptive_idx + 1 + self.wbs.len() - 1 < self.size_limits.len() { + self.adaptive_idx += 1; + } + } else if (task_avg_bytes as f64) < ((target_bytes as f64) / 1.25) { + if self.adaptive_idx > 0 { + self.adaptive_idx -= 1; + } + } + + if self.current_idx != 0 { + self.current_idx -= 1; + } + task + } + + pub fn add(&mut self, task: AsyncWriterTask) { + self.wbs.push_back(task); + } +} + +pub trait HandleRaftReadyContext where WK: Mutable, - WR: RaftLogBatch, + ER: RaftEngine, { /// Returns the mutable references of WriteBatch for both KvDB and RaftDB in one interface. - fn wb_mut(&mut self) -> (&mut WK, Arc>>>); + fn wb_mut(&mut self) -> (&mut WK, Arc>>); fn kv_wb_mut(&mut self) -> &mut WK; //fn raft_wb_mut(&mut self) -> &mut WR; - fn raft_wb_pool(&mut self) -> Arc>>>; + fn raft_wb_pool(&mut self) -> Arc>>; fn sync_log(&self) -> bool; fn set_sync_log(&mut self, sync: bool); } @@ -1025,7 +1150,7 @@ where // to the return one. // WARNING: If this function returns error, the caller must panic otherwise the entry cache may // be wrong and break correctness. - pub fn append>( + pub fn append>( &mut self, invoke_ctx: &mut InvokeContext, entries: Vec, @@ -1062,13 +1187,14 @@ where { let raft_wb_pool = ready_ctx.raft_wb_pool(); let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); + let current = + raft_wbs.prepare_current_for_write(region_id, ready_number, region_notifier); current.wb.append(region_id, entries)?; - current.unsynced_readies.insert(region_id, - UnsyncedReady{number: ready_number, region_id, notifier: region_notifier, version: 0}); // Delete any previously appended log entries which never committed. // TODO: Wrap it as an engine::Error. - current.wb.cut_logs(region_id, last_index + 1, prev_last_index); + current + .wb + .cut_logs(region_id, last_index + 1, prev_last_index); } invoke_ctx.raft_state.set_last_index(last_index); @@ -1380,7 +1506,7 @@ where /// it explicitly to disk. If it's flushed to disk successfully, `post_ready` should be called /// to update the memory states properly. /// WARNING: If this function returns error, the caller must panic(details in `append` function). - pub fn handle_raft_ready>( + pub fn handle_raft_ready>( &mut self, ready_ctx: &mut H, ready: &mut Ready, @@ -1395,10 +1521,18 @@ where fail_point!("raft_before_apply_snap"); let (kv_wb, raft_wb_pool) = ready_ctx.wb_mut(); let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); - self.apply_snapshot(&mut ctx, ready.snapshot(), kv_wb, &mut current.wb, &destroy_regions)?; - current.unsynced_readies.insert(region_id, - UnsyncedReady{number: ready.number(), region_id, notifier: region_notifier.clone(), version: 0}); + let current = raft_wbs.prepare_current_for_write( + region_id, + ready.number(), + region_notifier.clone(), + ); + self.apply_snapshot( + &mut ctx, + ready.snapshot(), + kv_wb, + &mut current.wb, + &destroy_regions, + )?; fail_point!("raft_after_apply_snap"); ctx.destroyed_regions = destroy_regions; @@ -1407,7 +1541,13 @@ where }; if !ready.entries().is_empty() { - self.append(&mut ctx, ready.take_entries(), ready_ctx, ready.number(), region_notifier.clone())?; + self.append( + &mut ctx, + ready.take_entries(), + ready_ctx, + ready.number(), + region_notifier.clone(), + )?; } // Last index is 0 means the peer is created from raft message @@ -1423,8 +1563,9 @@ where { let raft_wb_pool = ready_ctx.raft_wb_pool(); let mut raft_wbs = raft_wb_pool.lock().unwrap(); - let current = raft_wbs.front_mut().unwrap(); + let current = raft_wbs.current_mut(); ctx.save_raft_state_to(&mut current.wb)?; + // TODO ??? } if snapshot_index > 0 { // in case of restart happen when we just write region state to Applying,