Skip to content

Commit

Permalink
add benchmark code
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Mar 28, 2024
1 parent 5a74e48 commit 464a8c1
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,9 @@ harness = false
name = "bench_imm_compact"
harness = false

[[bench]]
name = "bench_table_watermarks"
harness = false

[lints]
workspace = true
261 changes: 261 additions & 0 deletions src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lazy_cell)]

use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, LazyLock};

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use itertools::Itertools;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
use spin::Mutex;
use tokio::sync::mpsc::unbounded_channel;

fn vnode_bitmaps(part_count: usize) -> impl Iterator<Item = Arc<Bitmap>> {
static BITMAP_CACHE: LazyLock<Mutex<HashMap<usize, Vec<Arc<Bitmap>>>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
assert_eq!(VirtualNode::COUNT % part_count, 0);
let mut cache = BITMAP_CACHE.lock();
match cache.entry(part_count) {
Entry::Occupied(entry) => entry.get().clone().into_iter(),
Entry::Vacant(entry) => entry
.insert({
let part_size = VirtualNode::COUNT / part_count;
(0..part_count)
.map(move |part_idx| {
let start = part_idx * part_size;
let end = part_idx * part_size + part_size;
let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT);
for i in start..end {
bitmap.set(i, true);
}
Arc::new(bitmap.finish())
})
.collect()
})
.clone()
.into_iter(),
}
}

fn gen_watermark(epoch_idx: usize) -> Bytes {
static WATERMARK_CACHE: LazyLock<Mutex<HashMap<usize, Bytes>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
let mut cache = WATERMARK_CACHE.lock();
match cache.entry(epoch_idx) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => entry
.insert(Bytes::copy_from_slice(
format!("key_test_{:010}", epoch_idx).as_bytes(),
))
.clone(),
}
}

fn gen_epoch_watermarks(
epoch_idx: usize,
vnode_part_count: usize,
) -> (HummockEpoch, Vec<VnodeWatermark>) {
(
test_epoch(epoch_idx as _),
vnode_bitmaps(vnode_part_count)
.map(|bitmap| VnodeWatermark::new(bitmap, gen_watermark(epoch_idx)))
.collect_vec(),
)
}

fn gen_committed_table_watermarks(
old_epoch_idx: usize,
new_epoch_idx: usize,
vnode_part_count: usize,
) -> TableWatermarks {
assert!(old_epoch_idx <= new_epoch_idx);
let watermark = TableWatermarks {
watermarks: (old_epoch_idx..=new_epoch_idx)
.map(|epoch_idx| {
let (epoch, watermarks) = gen_epoch_watermarks(epoch_idx, vnode_part_count);
(epoch, watermarks.into())
})
.collect(),
direction: WatermarkDirection::Ascending,
};
watermark
}

fn gen_version(
old_epoch_idx: usize,
new_epoch_idx: usize,
table_count: usize,
vnode_part_count: usize,
) -> HummockVersion {
let table_watermarks = Arc::new(gen_committed_table_watermarks(
old_epoch_idx,
new_epoch_idx,
vnode_part_count,
));
// let table_watermarks =
// gen_committed_table_watermarks(old_epoch_idx, new_epoch_idx, vnode_part_count);
let version = HummockVersion {
id: new_epoch_idx as _,
max_committed_epoch: test_epoch(new_epoch_idx as _),
safe_epoch: test_epoch(old_epoch_idx as _),
table_watermarks: (0..table_count)
.map(|table_id| (TableId::new(table_id as _), table_watermarks.clone()))
.collect(),
..Default::default()
};
version
}

fn bench_table_watermarks(c: &mut Criterion) {
let version_count = 500;
let epoch_count = 1000;
let table_count = 1;
let vnode_part_count = 16;
let pre_generated_versions: VecDeque<_> = (1..version_count + 1)
.map(|epoch_idx| {
gen_version(
epoch_idx,
epoch_idx + epoch_count,
table_count,
vnode_part_count,
)
})
.collect();
c.bench_function("new pinned version", |b| {
b.iter_batched(
|| pre_generated_versions.clone(),
|mut versions| {
let mut pinned_version =
PinnedVersion::new(versions.pop_front().unwrap(), unbounded_channel().0);
while let Some(version) = versions.pop_front() {
pinned_version = pinned_version.new_pin_version(version);
}
},
BatchSize::SmallInput,
)
});

let safe_epoch_idx: usize = 9500;
let committed_epoch_idx: usize = 10000;
let staging_epoch_count: usize = 500;
let vnode_part_count = 16;

let mut table_watermarks = TableWatermarksIndex::new_committed(
gen_committed_table_watermarks(safe_epoch_idx, committed_epoch_idx, vnode_part_count)
.into(),
test_epoch(committed_epoch_idx as u64),
);
for i in 0..staging_epoch_count {
let (epoch, watermarks) =
gen_epoch_watermarks(committed_epoch_idx + i + 1, vnode_part_count);
table_watermarks.add_epoch_watermark(
epoch,
watermarks.into(),
WatermarkDirection::Ascending,
);
}
let table_watermarks = table_watermarks;
let committed_watermarks = gen_committed_table_watermarks(
safe_epoch_idx + 1,
committed_epoch_idx + 1,
vnode_part_count,
);
let batch_size = 100;
c.bench_function("apply committed watermark", |b| {
b.iter_batched(
|| {
(0..batch_size)
.map(|_| (table_watermarks.clone(), committed_watermarks.clone()))
.collect_vec()
},
|list| {
for (mut table_watermarks, committed_watermarks) in list {
table_watermarks.apply_committed_watermarks(
committed_watermarks.into(),
test_epoch((committed_epoch_idx + 1) as _),
);
}
},
BatchSize::SmallInput,
)
});

// Code for the original table watermark index
// let mut table_watermarks =
// gen_committed_table_watermarks(safe_epoch_idx, committed_epoch_idx, vnode_part_count)
// .build_index(test_epoch(committed_epoch_idx as u64));
// for i in 0..staging_epoch_count {
// let (epoch, watermarks) =
// gen_epoch_watermarks(committed_epoch_idx + i + 1, vnode_part_count);
// table_watermarks.add_epoch_watermark(epoch, &watermarks, WatermarkDirection::Ascending);
// }
// let table_watermarks = table_watermarks;
// let committed_watermarks = gen_committed_table_watermarks(
// safe_epoch_idx + 1,
// committed_epoch_idx + 1,
// vnode_part_count,
// )
// .build_index(test_epoch((committed_epoch_idx + 1) as _));
// let batch_size = 100;
// c.bench_function("apply committed watermark", |b| {
// b.iter_batched(
// || {
// (0..batch_size)
// .map(|_| (table_watermarks.clone(), committed_watermarks.clone()))
// .collect_vec()
// },
// |list| {
// for (mut table_watermarks, committed_watermarks) in list {
// table_watermarks.apply_committed_watermarks(&committed_watermarks);
// }
// },
// BatchSize::SmallInput,
// )
// });

c.bench_function("read latest watermark", |b| {
b.iter(|| {
for i in 0..VirtualNode::COUNT {
let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i));
}
})
});

c.bench_function("read committed watermark", |b| {
b.iter(|| {
for i in 0..VirtualNode::COUNT {
let _ = table_watermarks.read_watermark(
VirtualNode::from_index(i),
test_epoch(committed_epoch_idx as u64),
);
}
})
});
}

criterion_group!(benches, bench_table_watermarks);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion src/storage/src/hummock/local_version/pinned_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl PinnedVersion {
self.compaction_group_index.clone()
}

pub(crate) fn new_pin_version(&self, version: HummockVersion) -> Self {
pub fn new_pin_version(&self, version: HummockVersion) -> Self {
assert!(
version.id >= self.version.id,
"pinning a older version {}. Current is {}",
Expand Down

0 comments on commit 464a8c1

Please sign in to comment.