Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(memory): use thread-local sequence-based memory eviction policy #16087

Merged
merged 31 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9eb816a
perf(memory): use thread-local squence-based memory eviction policy
MrCroxx Apr 2, 2024
d998a13
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 2, 2024
b87e911
test(bench): add sequencer benchmark
MrCroxx Apr 2, 2024
f9678c3
fix: fix license header
MrCroxx Apr 2, 2024
3959a16
fix: do not init sequence when insert lru
MrCroxx Apr 2, 2024
1ba5fd6
perf: add lru bench
MrCroxx Apr 2, 2024
6483b58
fix: clear lru cache after drop
MrCroxx Apr 2, 2024
7a9a7c8
refactor: simplify clear
MrCroxx Apr 2, 2024
f76da99
fix: drop inited field when clear
MrCroxx Apr 3, 2024
f12c141
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 3, 2024
8eccfee
refactor: update metrics in rw
MrCroxx Apr 3, 2024
594111d
chore: update grafana
MrCroxx Apr 3, 2024
dc0737a
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 7, 2024
5d98779
refactor: make sequencer args configurable
MrCroxx Apr 7, 2024
f77d980
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 8, 2024
dce4999
chore: tiny refactors
MrCroxx Apr 8, 2024
7ebc5d3
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 16, 2024
2339eee
chore: make clippy happier
MrCroxx Apr 16, 2024
ebc27aa
fix: enable unstabl feature
MrCroxx Apr 16, 2024
d19b1ae
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx Apr 22, 2024
8bc7ee1
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 10, 2024
8e6140c
Merge branch 'main' into xx/thread-local-sequence
MrCroxx May 13, 2024
dc43b6a
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 23, 2024
6ae11d5
chore: fill rust docs for Sequencer
MrCroxx May 23, 2024
c934b8e
chore: refine docs for controller
MrCroxx May 23, 2024
3dfd2d6
fix: fix bench build
MrCroxx May 23, 2024
207df01
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 23, 2024
7b68659
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 24, 2024
93858bb
fix: resolve grafana build
MrCroxx May 27, 2024
3d5917c
refactor: remove `update_epoch`
MrCroxx May 27, 2024
a46061b
Merge remote-tracking branch 'origin/main' into xx/thread-local-sequence
MrCroxx May 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/common/Cargo.toml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I heard that some stateless queries in NexMark were negatively affected by this PR for some "unknown" cause. Have we found the reason now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not. But the regression hasn't appear these weeks.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
ahash = "0.8"
anyhow = "1"
arc-swap = "1"
arrow-array = { workspace = true }
Expand Down Expand Up @@ -48,6 +49,7 @@ fixedbitset = "0.5"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hashbrown = "0.14"
hex = "0.4.3"
http = "0.2"
humantime = "2.1"
Expand Down Expand Up @@ -134,6 +136,7 @@ libc = "0.2"
mach2 = "0.4"

[dev-dependencies]
coarsetime = "0.1"
criterion = { workspace = true }
expect-test = "1"
more-asserts = "0.3"
Expand Down Expand Up @@ -166,5 +169,9 @@ harness = false
name = "bench_array"
harness = false

[[bench]]
name = "bench_sequencer"
harness = false

[lints]
workspace = true
153 changes: 153 additions & 0 deletions src/common/benches/bench_sequencer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::cell::RefCell;
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved
use std::hint::black_box;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use itertools::Itertools;
use risingwave_common::sequence::*;

thread_local! {
pub static SEQUENCER_64_8: RefCell<Sequencer> = RefCell::new(Sequencer::new(64, 64 * 8));
pub static SEQUENCER_64_16: RefCell<Sequencer> = RefCell::new(Sequencer::new(64, 64 * 16));
pub static SEQUENCER_64_32: RefCell<Sequencer> = RefCell::new(Sequencer::new(64, 64 * 32));
pub static SEQUENCER_128_8: RefCell<Sequencer> = RefCell::new(Sequencer::new(128, 128 * 8));
pub static SEQUENCER_128_16: RefCell<Sequencer> = RefCell::new(Sequencer::new(128, 128 * 16));
pub static SEQUENCER_128_32: RefCell<Sequencer> = RefCell::new(Sequencer::new(128, 128 * 32));
}

fn coarse(loops: usize) -> Duration {
let now = Instant::now();
for _ in 0..loops {
let _ = coarsetime::Instant::now();
}
now.elapsed()
}

fn primitive(loops: usize) -> Duration {
let mut cnt = 0usize;
let now = Instant::now();
for _ in 0..loops {
cnt += 1;
let _ = cnt;
}
now.elapsed()
}

fn atomic(loops: usize, atomic: Arc<AtomicUsize>) -> Duration {
let now = Instant::now();
for _ in 0..loops {
let _ = atomic.fetch_add(1, Ordering::Relaxed);
}
now.elapsed()
}

fn atomic_skip(loops: usize, atomic: Arc<AtomicUsize>, skip: usize) -> Duration {
let mut cnt = 0usize;
let now = Instant::now();
for _ in 0..loops {
cnt += 1;
let _ = cnt;
if cnt % skip == 0 {
let _ = atomic.fetch_add(skip, Ordering::Relaxed);
} else {
let _ = atomic.load(Ordering::Relaxed);
}
}
now.elapsed()
}

fn sequencer(loops: usize, step: Sequence, lag_amp: Sequence) -> Duration {
let sequencer = match (step, lag_amp) {
(64, 8) => &SEQUENCER_64_8,
(64, 16) => &SEQUENCER_64_16,
(64, 32) => &SEQUENCER_64_32,
(128, 8) => &SEQUENCER_128_8,
(128, 16) => &SEQUENCER_128_16,
(128, 32) => &SEQUENCER_128_32,
_ => unimplemented!(),
};
let now = Instant::now();
for _ in 0..loops {
let _ = sequencer.with(|s| s.borrow_mut().inc());
}
now.elapsed()
}

fn benchmark<F>(name: &str, threads: usize, loops: usize, f: F)
where
F: Fn() -> Duration + Clone + Send + 'static,
{
let handles = (0..threads)
.map(|_| std::thread::spawn(black_box(f.clone())))
.collect_vec();
let mut dur = Duration::from_nanos(0);
for handle in handles {
dur += handle.join().unwrap();
}
println!(
"{:20} {} threads {} loops: {:?} per iter",
name,
threads,
loops,
Duration::from_nanos((dur.as_nanos() / threads as u128 / loops as u128) as u64)
);
}

fn main() {
for (threads, loops) in [
(1, 10_000_000),
(4, 10_000_000),
(8, 10_000_000),
(16, 10_000_000),
(32, 10_000_000),
] {
println!();

benchmark("primitive", threads, loops, move || primitive(loops));

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic", threads, loops, move || atomic(loops, a.clone()));

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 8", threads, loops, move || {
atomic_skip(loops, a.clone(), 8)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 16", threads, loops, move || {
atomic_skip(loops, a.clone(), 16)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 32", threads, loops, move || {
atomic_skip(loops, a.clone(), 32)
});

let a = Arc::new(AtomicUsize::new(0));
benchmark("atomic skip 64", threads, loops, move || {
atomic_skip(loops, a.clone(), 64)
});

benchmark("sequencer(64,8)", threads, loops, move || {
sequencer(loops, 64, 8)
});
benchmark("sequencer(64,16)", threads, loops, move || {
sequencer(loops, 64, 16)
});
benchmark("sequencer(64,32)", threads, loops, move || {
sequencer(loops, 64, 32)
});
benchmark("sequencer(128,8)", threads, loops, move || {
sequencer(loops, 128, 8)
});
benchmark("sequencer(128,16)", threads, loops, move || {
sequencer(loops, 128, 16)
});
benchmark("sequencer(128,32)", threads, loops, move || {
sequencer(loops, 128, 32)
});

benchmark("coarse", threads, loops, move || coarse(loops));
}
}
24 changes: 24 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,15 @@ pub struct StreamingDeveloperConfig {
#[serde(default = "default::developer::memory_controller_threshold_stable")]
pub memory_controller_threshold_stable: f64,

#[serde(default = "default::developer::memory_controller_eviction_factor_aggressive")]
pub memory_controller_eviction_factor_aggressive: f64,

#[serde(default = "default::developer::memory_controller_eviction_factor_graceful")]
pub memory_controller_eviction_factor_graceful: f64,

#[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
pub memory_controller_eviction_factor_stable: f64,

#[serde(default = "default::developer::stream_enable_arrangement_backfill")]
/// Enable arrangement backfill
/// If true, the arrangement backfill will be disabled,
Expand Down Expand Up @@ -1566,12 +1575,27 @@ pub mod default {
pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}

pub fn memory_controller_threshold_graceful() -> f64 {
0.8
}

pub fn memory_controller_threshold_stable() -> f64 {
0.7
}

pub fn memory_controller_eviction_factor_aggressive() -> f64 {
2.0
}

pub fn memory_controller_eviction_factor_graceful() -> f64 {
1.5
}

pub fn memory_controller_eviction_factor_stable() -> f64 {
1.0
}

pub fn stream_enable_arrangement_backfill() -> bool {
true
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ pub use risingwave_common_metrics::{
register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
register_guarded_int_gauge_vec_with_registry,
};
pub mod lru;
pub mod opts;
pub mod range;
pub mod row;
pub mod sequence;
pub mod session_config;
pub mod system_param;
pub mod telemetry;
Expand Down
Loading
Loading