Skip to content

Commit

Permalink
Add store_io_min_interval_us
Browse files Browse the repository at this point in the history
Signed-off-by: Liu Cong <[email protected]>
  • Loading branch information
innerr committed Jan 4, 2021
1 parent 2526754 commit 9d03775
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
2 changes: 2 additions & 0 deletions components/raftstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#![feature(div_duration)]
#![feature(min_specialization)]
#![feature(box_patterns)]
#![feature(duration_saturating_ops)]
#![feature(duration_zero)]

#[macro_use]
extern crate bitflags;
Expand Down
6 changes: 6 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct Config {
#[config(skip)]
pub delay_sync_us: u64,
#[config(skip)]
pub store_io_min_interval_us: u64,
#[config(skip)]
pub store_io_pool_size: u64,
#[config(skip)]
pub store_io_queue: u64,
Expand Down Expand Up @@ -202,6 +204,7 @@ impl Default for Config {
let split_size = ReadableSize::mb(coprocessor::config::SPLIT_SIZE_MB);
Config {
delay_sync_us: 0,
store_io_min_interval_us: 500,
store_io_pool_size: 2,
store_io_queue: 1,
apply_io_size: 0,
Expand Down Expand Up @@ -430,6 +433,9 @@ impl Config {
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["delay_sync_us"])
.set((self.delay_sync_us as i32).into());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["store_io_min_interval_us"])
.set((self.store_io_min_interval_us as i32).into());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["store_io_pool_size"])
.set((self.store_io_pool_size as i32).into());
Expand Down
29 changes: 15 additions & 14 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ where
engine: ER,
router: RaftRouter<EK, ER>,
tag: String,
buff_size: usize,
io_min_interval: Duration,
tasks: Arc<Mutex<VecDeque<AsyncWriterTask<ER::LogBatch>>>>,
workers: Arc<Mutex<Vec<JoinHandle<()>>>>,
}
Expand All @@ -109,7 +109,7 @@ where
engine: self.engine.clone(),
router: self.router.clone(),
tag: self.tag.clone(),
buff_size: self.buff_size,
io_min_interval: self.io_min_interval.clone(),
tasks: self.tasks.clone(),
workers: self.workers.clone(),
}
Expand All @@ -121,7 +121,8 @@ where
EK: KvEngine,
ER: RaftEngine,
{
pub fn new(engine: ER, router: RaftRouter<EK, ER>, tag: String, pool_size: usize, buff_size: usize) -> AsyncWriter<EK, ER> {
pub fn new(engine: ER, router: RaftRouter<EK, ER>, tag: String,
pool_size: usize, io_min_interval_us: u64) -> AsyncWriter<EK, ER> {
let mut tasks = VecDeque::default();
for _ in 0..(pool_size + 1) {
tasks.push_back(
Expand All @@ -135,7 +136,7 @@ where
engine,
router,
tag,
buff_size,
io_min_interval: Duration::from_micros(io_min_interval_us),
tasks: Arc::new(Mutex::new(tasks)),
workers: Arc::new(Mutex::new(vec![])),
};
Expand All @@ -149,20 +150,20 @@ where
let t = thread::Builder::new()
.name(thd_name!(format!("raftdb-async-writer-{}", i)))
.spawn(move || {
let mut sleep = false;
let mut last_ts = TiInstant::now_coarse();
loop {
// TODO: block if no data in current raft_wb
if sleep {
let d = Duration::from_millis(1);
thread::sleep(d);
let mut now_ts = TiInstant::now_coarse();
let delta = (now_ts - last_ts).saturating_sub(x.io_min_interval);
if !delta.is_zero() {
thread::sleep(delta);
now_ts = TiInstant::now_coarse();
}
sleep = false;
let now = TiInstant::now_coarse();
last_ts = now_ts;

// TODO: block if too many data in current raft_wb
let task = {
let mut tasks = x.tasks.lock().unwrap();
if tasks.front().unwrap().unsynced_readies.is_empty() {
sleep = true;
continue;
}
tasks.pop_front().unwrap()
Expand All @@ -176,7 +177,7 @@ where
}

STORE_WRITE_RAFTDB_TICK_DURATION_HISTOGRAM
.observe(duration_to_sec(now.elapsed()) as f64);
.observe(duration_to_sec(now_ts.elapsed()) as f64);
}
})
.unwrap();
Expand Down Expand Up @@ -1407,7 +1408,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
);
let async_writer = Arc::new(Mutex::new(AsyncWriter::new(engines.raft.clone(),
self.router.clone(), "raftstore-async-writer".to_string(),
cfg.value().store_io_pool_size as usize, cfg.value().store_io_queue as usize)));
cfg.value().store_io_pool_size as usize, cfg.value().store_io_min_interval_us)));
let mut builder = RaftPollerBuilder {
cfg,
store: meta,
Expand Down

0 comments on commit 9d03775

Please sign in to comment.