Skip to content

Commit

Permalink
Fixed corruption
Browse files Browse the repository at this point in the history
Signed-off-by: Liu Cong <[email protected]>
  • Loading branch information
innerr committed Jan 6, 2021
1 parent ea07df1 commit dcbe442
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 19 deletions.
1 change: 1 addition & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ where
let task = {
let mut tasks = x.tasks.lock().unwrap();
if tasks.no_task() {
// TODO: block and wait for new data arrive
continue;
}
tasks.detach_task()
Expand Down
8 changes: 7 additions & 1 deletion components/raftstore/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,13 @@ lazy_static! {
register_histogram!(
"tikv_raftstore_store_task_bytes",
"TODO",
exponential_buckets(1024.0, 2.0, 30).unwrap()
exponential_buckets(256.0, 2.0, 30).unwrap()
).unwrap();
pub static ref RAFT_ASYNC_WRITER_TASK_LIMIT_BYTES: Histogram =
register_histogram!(
"tikv_raftstore_store_task_limit_bytes",
"TODO",
exponential_buckets(256.0, 2.0, 30).unwrap()
).unwrap();


Expand Down
48 changes: 30 additions & 18 deletions components/raftstore/src/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ where
},
);
}

pub fn is_empty(&self) -> bool {
self.unsynced_readies.is_empty()
}
}

/*
Expand Down Expand Up @@ -462,20 +466,16 @@ where
}
}

pub fn current(&self) -> &AsyncWriterTask<ER::LogBatch> {
assert!(self.current_idx <= self.wbs.len());
&self.wbs[self.current_idx]
}

pub fn prepare_current_for_write(&mut self) -> &mut AsyncWriterTask<ER::LogBatch> {
assert!(self.current_idx <= self.wbs.len());
let current_size = {
self.wbs[self.current_idx].wb.persist_size()
};
if self.current_idx + 1 < self.wbs.len() {
assert!(self.wbs[self.current_idx + 1].is_empty());
}
let current_size = self.wbs[self.current_idx].wb.persist_size();
if current_size >= self.size_limits[self.adaptive_idx + self.current_idx] {
if self.current_idx + 1 < self.wbs.len() {
self.current_idx += 1;
assert!(self.wbs[self.current_idx].unsynced_readies.len() == 0);
assert!(self.wbs[self.current_idx].is_empty());
} else {
// do nothing, adaptive IO size
}
Expand All @@ -484,7 +484,7 @@ where
}

pub fn no_task(&self) -> bool {
let no_task = self.wbs.front().unwrap().unsynced_readies.is_empty();
let no_task = self.wbs.front().unwrap().is_empty();
if no_task {
assert!(self.current_idx == 0);
}
Expand All @@ -494,20 +494,30 @@ where
pub fn detach_task(&mut self) -> AsyncWriterTask<ER::LogBatch> {
RAFT_ASYNC_WRITER_QUEUE_SIZE.observe(self.current_idx as f64);
RAFT_ASYNC_WRITER_ADAPTIVE_IDX.observe(self.adaptive_idx as f64);

assert!(self.current_idx <= self.wbs.len());
if self.current_idx + 1 < self.wbs.len() {
assert!(self.wbs[self.current_idx + 1].is_empty());
}
if self.current_idx != 0 {
assert!(!self.wbs[self.current_idx - 1].is_empty());
}
let task = self.wbs.pop_front().unwrap();
assert!(!task.is_empty());

let task_bytes = task.wb.persist_size();
RAFT_ASYNC_WRITER_TASK_BYTES.observe(task_bytes as f64);

let task_avg_bytes = self.sample_window.observe_and_get_avg(task_bytes as f64);
let target_bytes = self.size_limits[self.adaptive_idx + self.current_idx];
if task_avg_bytes >= (target_bytes as f64) {
if self.adaptive_idx + 1 + self.wbs.len() - 1 < self.size_limits.len() {
let target_bytes = self.size_limits[self.adaptive_idx + self.current_idx] as f64;
RAFT_ASYNC_WRITER_TASK_LIMIT_BYTES.observe(target_bytes);
if task_avg_bytes >= target_bytes {
if self.adaptive_idx + (self.wbs.len() - 1) + 1 < self.size_limits.len() {
self.adaptive_idx += 1;
}
} else if (task_avg_bytes as f64) < ((target_bytes as f64) / 1.25) {
if self.adaptive_idx > 0 {
self.adaptive_idx -= 1;
}
} else if self.adaptive_idx > 0 &&
task_avg_bytes < (self.size_limits[self.adaptive_idx + self.current_idx - 1] as f64) {
self.adaptive_idx -= 1;
}

if self.current_idx != 0 {
Expand All @@ -517,11 +527,11 @@ where
}

pub fn add(&mut self, task: AsyncWriterTask<ER::LogBatch>) {
assert!(task.is_empty());
self.wbs.push_back(task);
}
}


pub trait HandleRaftReadyContext<WK, ER>
where
WK: Mutable,
Expand Down Expand Up @@ -1591,6 +1601,7 @@ where
let raft_wb_pool = ready_ctx.raft_wb_pool();
let mut raft_wbs = raft_wb_pool.lock().unwrap();
let current = raft_wbs.prepare_current_for_write();
let current_size = current.wb.persist_size();

if !ready.snapshot().is_empty() {
fail_point!("raft_before_apply_snap");
Expand All @@ -1617,6 +1628,7 @@ where
}

if ready.must_sync() {
assert!(current_size != current.wb.persist_size());
current.on_written(region_id, ready.number(), region_notifier.clone());
}
}
Expand Down

0 comments on commit dcbe442

Please sign in to comment.