Skip to content

Commit

Permalink
Adaptive IO size
Browse files Browse the repository at this point in the history
Signed-off-by: Liu Cong <[email protected]>
  • Loading branch information
innerr committed Jan 4, 2021
1 parent 9d03775 commit 4e47524
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 72 deletions.
4 changes: 4 additions & 0 deletions components/engine_panic/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ impl RaftLogBatch for PanicWriteBatch {
panic!()
}

fn persist_size(&self) -> usize {
panic!()
}

fn is_empty(&self) -> bool {
panic!()
}
Expand Down
4 changes: 4 additions & 0 deletions components/engine_rocks/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ impl RaftLogBatch for RocksWriteBatch {
self.put_msg(&keys::raft_state_key(raft_group_id), state)
}

fn persist_size(&self) -> usize {
self.data_size()
}

fn is_empty(&self) -> bool {
Mutable::is_empty(self)
}
Expand Down
2 changes: 2 additions & 0 deletions components/engine_traits/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub trait RaftLogBatch: Send {

fn put_raft_state(&mut self, raft_group_id: u64, state: &RaftLocalState) -> Result<()>;

fn persist_size(&self) -> usize;

fn is_empty(&self) -> bool;
}

Expand Down
4 changes: 4 additions & 0 deletions components/raft_log_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl RaftLogBatchTrait for RaftLogBatch {
Ok(())
}

fn persist_size(&self) -> usize {
panic!("not impl!")
}

fn is_empty(&self) -> bool {
self.0.items.is_empty()
}
Expand Down
6 changes: 6 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct Config {
#[config(skip)]
pub store_io_min_interval_us: u64,
#[config(skip)]
pub store_io_max_bytes: u64,
#[config(skip)]
pub store_io_pool_size: u64,
#[config(skip)]
pub store_io_queue: u64,
Expand Down Expand Up @@ -205,6 +207,7 @@ impl Default for Config {
Config {
delay_sync_us: 0,
store_io_min_interval_us: 500,
store_io_max_bytes: 128 * 1024,
store_io_pool_size: 2,
store_io_queue: 1,
apply_io_size: 0,
Expand Down Expand Up @@ -436,6 +439,9 @@ impl Config {
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["store_io_min_interval_us"])
.set((self.store_io_min_interval_us as i32).into());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["store_io_max_bytes"])
.set((self.store_io_max_bytes as i32).into());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["store_io_pool_size"])
.set((self.store_io_pool_size as i32).into());
Expand Down
8 changes: 5 additions & 3 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,9 +925,11 @@ where
if !data.is_empty() {
let cmd = util::parse_data_at(data, index, &self.tag);

if should_write_to_engine(&cmd) ||
(apply_ctx.apply_io_size != 0 && (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size)) ||
(apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine()) {
if should_write_to_engine(&cmd)
|| (apply_ctx.apply_io_size != 0
&& (apply_ctx.kv_wb().data_size() >= apply_ctx.apply_io_size))
|| (apply_ctx.apply_io_size == 0 && apply_ctx.kv_wb().should_write_to_engine())
{
apply_ctx.commit(self);
if let Some(start) = self.handle_start.as_ref() {
if start.elapsed() >= apply_ctx.yield_duration {
Expand Down
76 changes: 43 additions & 33 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.

use std::cmp::{Ord, Ordering as CmpOrdering};
use std::collections::BTreeMap;
use std::collections::Bound::{Excluded, Included, Unbounded};
Expand Down Expand Up @@ -83,6 +81,7 @@ pub const PENDING_MSG_CAP: usize = 100;
const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10);

use crate::store::peer_storage::AsyncWriterTask;
use crate::store::peer_storage::AsyncWriterTasks;
use std::collections::VecDeque;
use std::thread::JoinHandle;

Expand All @@ -95,7 +94,7 @@ where
router: RaftRouter<EK, ER>,
tag: String,
io_min_interval: Duration,
tasks: Arc<Mutex<VecDeque<AsyncWriterTask<ER::LogBatch>>>>,
tasks: Arc<Mutex<AsyncWriterTasks<ER>>>,
workers: Arc<Mutex<Vec<JoinHandle<()>>>>,
}

Expand All @@ -105,7 +104,7 @@ where
ER: RaftEngine,
{
fn clone(&self) -> Self {
AsyncWriter{
AsyncWriter {
engine: self.engine.clone(),
router: self.router.clone(),
tag: self.tag.clone(),
Expand All @@ -121,18 +120,16 @@ where
EK: KvEngine,
ER: RaftEngine,
{
pub fn new(engine: ER, router: RaftRouter<EK, ER>, tag: String,
pool_size: usize, io_min_interval_us: u64) -> AsyncWriter<EK, ER> {
let mut tasks = VecDeque::default();
for _ in 0..(pool_size + 1) {
tasks.push_back(
AsyncWriterTask {
wb: engine.log_batch(4 * 1024),
unsynced_readies: HashMap::default(),
}
);
}
let mut async_writer = AsyncWriter{
pub fn new(
engine: ER,
router: RaftRouter<EK, ER>,
tag: String,
pool_size: usize,
io_min_interval_us: u64,
io_max_bytes: u64,
) -> AsyncWriter<EK, ER> {
let tasks = AsyncWriterTasks::new(engine.clone(), pool_size * 3, io_max_bytes as usize);
let mut async_writer = AsyncWriter {
engine,
router,
tag,
Expand Down Expand Up @@ -160,20 +157,23 @@ where
}
last_ts = now_ts;

// TODO: block if too many data in current raft_wb
let task = {
let mut tasks = x.tasks.lock().unwrap();
if tasks.front().unwrap().unsynced_readies.is_empty() {
if tasks.no_task() {
continue;
}
tasks.pop_front().unwrap()
tasks.detach_task()
};

let wb = x.sync_write(task.wb, task.unsynced_readies);

// TODO: block if too many tasks
{
let mut tasks = x.tasks.lock().unwrap();
tasks.push_back(AsyncWriterTask{wb, unsynced_readies: HashMap::default()});
tasks.add(AsyncWriterTask {
wb,
unsynced_readies: HashMap::default(),
});
}

STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM
Expand All @@ -186,12 +186,16 @@ where
}
}

pub fn raft_wb_pool(&mut self) -> Arc<Mutex<VecDeque<AsyncWriterTask<ER::LogBatch>>>> {
pub fn raft_wb_pool(&mut self) -> Arc<Mutex<AsyncWriterTasks<ER>>> {
self.tasks.clone()
}

// TODO: this func is assumed in tasks.locked status
pub fn sync_write(&mut self, mut wb: ER::LogBatch, mut unsynced_readies: HashMap<u64, UnsyncedReady>) -> ER::LogBatch {
pub fn sync_write(
&mut self,
mut wb: ER::LogBatch,
mut unsynced_readies: HashMap<u64, UnsyncedReady>,
) -> ER::LogBatch {
let now = TiInstant::now_coarse();
self.engine
.consume_and_shrink(&mut wb, true, RAFT_WB_SHRINK_SIZE, 4 * 1024)
Expand Down Expand Up @@ -224,7 +228,10 @@ where
if pre_number >= r.number {
break;
}
if pre_number == r.notifier.compare_and_swap(pre_number, r.number, Ordering::AcqRel) {
if pre_number
== r.notifier
.compare_and_swap(pre_number, r.number, Ordering::AcqRel)
{
if let Err(e) = self.router.force_send(r.region_id, PeerMsg::Noop) {
error!(
"failed to send noop to trigger persisted ready";
Expand Down Expand Up @@ -500,12 +507,12 @@ where
pub async_writer: Arc<Mutex<AsyncWriter<EK, ER>>>,
}

impl<EK, ER, T> HandleRaftReadyContext<EK::WriteBatch, ER::LogBatch> for PollContext<EK, ER, T>
impl<EK, ER, T> HandleRaftReadyContext<EK::WriteBatch, ER> for PollContext<EK, ER, T>
where
EK: KvEngine,
ER: RaftEngine,
{
fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc::<Mutex<VecDeque<AsyncWriterTask<ER::LogBatch>>>>) {
fn wb_mut(&mut self) -> (&mut EK::WriteBatch, Arc<Mutex<AsyncWriterTasks<ER>>>) {
self.raft_wb_is_empty = false;
let mut async_writer = self.async_writer.lock().unwrap();
(&mut self.kv_wb, async_writer.raft_wb_pool())
Expand All @@ -517,7 +524,7 @@ where
}

#[inline]
fn raft_wb_pool(&mut self) -> Arc<Mutex<VecDeque<AsyncWriterTask<ER::LogBatch>>>> {
fn raft_wb_pool(&mut self) -> Arc<Mutex<AsyncWriterTasks<ER>>> {
self.raft_wb_is_empty = false;
let mut async_writer = self.async_writer.lock().unwrap();
async_writer.raft_wb_pool()
Expand Down Expand Up @@ -821,8 +828,7 @@ impl<EK: KvEngine, ER: RaftEngine, T: Transport> RaftPoller<EK, ER, T> {
}
}
fail_point!("raft_between_save");
STORE_WRITE_KVDB_DURATION_HISTOGRAM
.observe(duration_to_sec(now.elapsed()) as f64);
STORE_WRITE_KVDB_DURATION_HISTOGRAM.observe(duration_to_sec(now.elapsed()) as f64);

let raft_wb_is_empty = self.poll_ctx.raft_wb_is_empty;
if !raft_wb_is_empty {
Expand Down Expand Up @@ -933,8 +939,7 @@ impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, St
self.poll_ctx.sync_log = false;
self.poll_ctx.has_ready = false;
self.timer = TiInstant::now_coarse();
STORE_LOOP_DURATION_HISTOGRAM
.observe(duration_to_sec(self.loop_timer.elapsed()) as f64);
STORE_LOOP_DURATION_HISTOGRAM.observe(duration_to_sec(self.loop_timer.elapsed()) as f64);
self.loop_timer = TiInstant::now_coarse();
// update config
self.poll_ctx.perf_context_statistics.start();
Expand Down Expand Up @@ -1406,9 +1411,14 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
cfg.value().delay_sync_enabled(),
cfg.value().delay_sync_us as i64,
);
let async_writer = Arc::new(Mutex::new(AsyncWriter::new(engines.raft.clone(),
self.router.clone(), "raftstore-async-writer".to_string(),
cfg.value().store_io_pool_size as usize, cfg.value().store_io_min_interval_us)));
let async_writer = Arc::new(Mutex::new(AsyncWriter::new(
engines.raft.clone(),
self.router.clone(),
"raftstore-async-writer".to_string(),
cfg.value().store_io_pool_size as usize,
cfg.value().store_io_min_interval_us,
cfg.value().store_io_max_bytes,
)));
let mut builder = RaftPollerBuilder {
cfg,
store: meta,
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/fsm/sync_policy.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.

use std::collections::VecDeque;
use std::mem;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::Arc;
use std::mem;

use crossbeam::utils::CachePadded;
use engine_traits::KvEngine;
Expand Down
13 changes: 7 additions & 6 deletions components/raftstore/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,6 @@ make_auto_flush_static_metric! {
}

lazy_static! {
pub static ref ASYNC_WRITER_IO_QUEUE_VEC: IntGaugeVec =
register_int_gauge_vec!(
"tikv_raftstore_async_writer_io_queue_total",
"Current pending + running io tasks.",
&["name"]
).unwrap();
pub static ref STORE_LOOP_DURATION_HISTOGRAM: Histogram =
register_histogram!(
"tikv_raftstore_store_loop_duration_seconds",
Expand Down Expand Up @@ -222,6 +216,13 @@ lazy_static! {
"TODO",
exponential_buckets(1.0, 2.0, 20).unwrap()
).unwrap();
pub static ref RAFT_ASYNC_WRITER_ADAPTIVE_IDX: Histogram =
register_histogram!(
"tikv_raftstore_store_adaptive_idx",
"TODO",
exponential_buckets(1.0, 2.0, 20).unwrap()
).unwrap();


pub static ref PEER_PROPOSAL_COUNTER_VEC: IntCounterVec =
register_int_counter_vec!(
Expand Down
23 changes: 12 additions & 11 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1686,17 +1686,18 @@ where
}

let notifier = self.persisted_notifier.clone();
let invoke_ctx = match self
.mut_store()
.handle_raft_ready(ctx, &mut ready, destroy_regions, notifier)
{
Ok(r) => r,
Err(e) => {
// We may have written something to writebatch and it can't be reverted, so has
// to panic here.
panic!("{} failed to handle raft ready: {:?}", self.tag, e)
}
};
let invoke_ctx =
match self
.mut_store()
.handle_raft_ready(ctx, &mut ready, destroy_regions, notifier)
{
Ok(r) => r,
Err(e) => {
// We may have written something to writebatch and it can't be reverted, so has
// to panic here.
panic!("{} failed to handle raft ready: {:?}", self.tag, e)
}
};

Some((ready, invoke_ctx))
}
Expand Down
Loading

0 comments on commit 4e47524

Please sign in to comment.