Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crud-bench tokio runtime improvement #15

Merged
merged 9 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/crud-bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -54,7 +54,7 @@ jobs:
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -74,7 +74,7 @@ jobs:
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -90,11 +90,11 @@ jobs:

build:
name: Build crud-bench
runs-on: [runner-amd64-2xlarge-private]
runs-on: [ runner-amd64-2xlarge-private ]
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -120,7 +120,7 @@ jobs:
benchmark:
name: Benchmark ${{ matrix.description }}
needs: build
runs-on: [runner-amd64-2xlarge-private]
runs-on: [ runner-amd64-2xlarge-private ]
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -158,7 +158,7 @@ jobs:
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -179,6 +179,6 @@ jobs:

- name: Run benchmarks (100,000 samples / 32 threads)
run: ${{ github.workspace }}/artifacts/crud-bench -d ${{ matrix.database }} -s 100000 -t 32

- name: Run benchmarks (250,000 samples / 128 threads)
run: ${{ github.workspace }}/artifacts/crud-bench -d ${{ matrix.database }} -s 250000 -t 128
3 changes: 2 additions & 1 deletion crud-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ codegen-units = 1
[dependencies]
anyhow = "1.0.88"
clap = { version = "4.5.17", features = ["derive"] }
dashmap = "6.1.0"
env_logger = "0.11.5"
log = "0.4.22"
num_cpus = "1.16.0"
mongodb = { version = "2.8.2", optional = true }
rand = { version = "0.8.5", features = ["small_rng"] }
rayon = "1.10.0"
redb = { version = "2.1.3", optional = true }
redis = { version = "0.24.0", features = ["tokio-comp"], optional = true }
rocksdb = { git = "https://github.com/surrealdb/rust-rocksdb", features = ["lz4", "snappy"], optional = true }
Expand Down
5 changes: 4 additions & 1 deletion crud-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

## Purpose

The goal of this benchmark is for developers working on features in SurrealDB to assess their impact on CRUD performance.
The goal of this benchmark is for developers working on features in SurrealDB to assess their impact on CRUD
performance.

E.g.:

- Testing a new operator
- Work on indexes
- Work on query planner and execution plan
Expand Down Expand Up @@ -39,6 +41,7 @@ Options:
-d, --database <DATABASE> Database [possible values: dry, surrealdb, surrealdb-memory, surrealdb-rocksdb, surrealdb-surrealkv, mongodb, postgresql]
-s, --samples <SAMPLES> Number of samples
-t, --threads <THREADS> Number of concurrent threads
-w, --workers <WORKERS> Number of workers for the client async runtime (tokio). By default the number of logical CPUs.
-h, --help Print help
```

Expand Down
194 changes: 111 additions & 83 deletions crud-bench/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::io;
use std::io::Write;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};

use crate::Args;
use anyhow::{bail, Result};
use log::{error, info, warn};
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use rayon::scope;
use serde::{Deserialize, Serialize};
use tokio::runtime::Builder;
use tokio::task;
use tokio::time::sleep;

use crate::Args;

pub(crate) struct Benchmark {
threads: usize,
samples: i32,
Expand Down Expand Up @@ -51,8 +50,8 @@ impl Benchmark {

pub(crate) async fn wait_for_client<C, P>(&self, engine: &P) -> Result<C>
where
C: BenchmarkClient + Send,
P: BenchmarkEngine<C> + Send + Sync,
C: BenchmarkClient,
P: BenchmarkEngine<C>,
{
sleep(Duration::from_secs(2)).await;
let start = SystemTime::now();
Expand All @@ -69,30 +68,30 @@ impl Benchmark {

pub(crate) async fn run<C, P>(&self, engine: P) -> Result<BenchmarkResult>
where
C: BenchmarkClient + Send,
P: BenchmarkEngine<C> + Send + Sync,
C: BenchmarkClient,
P: BenchmarkEngine<C>,
{
// Start the client
self.wait_for_client(&engine).await?.startup().await?;

// Run the "creates" benchmark
info!("Start creates benchmark");
let creates = self.run_operation(&engine, BenchmarkOperation::Create)?;
let creates = self.run_operation(&engine, BenchmarkOperation::Create).await?;
info!("Creates benchmark done");

// Run the "reads" benchmark
info!("Start reads benchmark");
let reads = self.run_operation(&engine, BenchmarkOperation::Read)?;
let reads = self.run_operation(&engine, BenchmarkOperation::Read).await?;
info!("Reads benchmark done");

// Run the "reads" benchmark
info!("Start updates benchmark");
let updates = self.run_operation(&engine, BenchmarkOperation::Update)?;
let updates = self.run_operation(&engine, BenchmarkOperation::Update).await?;
info!("Reads benchmark done");

// Run the "deletes" benchmark
info!("Start deletes benchmark");
let deletes = self.run_operation(&engine, BenchmarkOperation::Delete)?;
let deletes = self.run_operation(&engine, BenchmarkOperation::Delete).await?;
info!("Deletes benchmark done");

Ok(BenchmarkResult {
Expand All @@ -103,81 +102,110 @@ impl Benchmark {
})
}

fn run_operation<C, P>(&self, engine: &P, operation: BenchmarkOperation) -> Result<Duration>
async fn run_operation<C, P>(
&self,
engine: &P,
operation: BenchmarkOperation,
) -> Result<Duration>
where
C: BenchmarkClient + Send,
C: BenchmarkClient,
P: BenchmarkEngine<C> + Send + Sync,
{
let error = Arc::new(AtomicBool::new(false));
let time = Instant::now();
let percent = Arc::new(AtomicU8::new(0));
print!("\r{operation:?} 0%");
scope(|s| {
let current = Arc::new(AtomicI32::new(0));
for thread_number in 0..self.threads {
let current = current.clone();
let error = error.clone();
let percent = percent.clone();
s.spawn(move |_| {
let mut record_provider = RecordProvider::default();
let runtime = Builder::new_multi_thread()
.thread_stack_size(10 * 1024 * 1024) // Set stack size to 10MiB
.worker_threads(4) // Set the number of worker threads
.enable_all() // Enables all runtime features, including I/O and time
.build()
.expect("Failed to create a runtime");
if let Err(e) = runtime.block_on(async {
info!("Thread #{thread_number}/{operation:?} starts");
let mut client = engine.create_client(self.endpoint.to_owned()).await?;
while !error.load(Ordering::Relaxed) {
let sample = current.fetch_add(1, Ordering::Relaxed);
if sample >= self.samples {
break;
}
// Calculate and print out the percents
{
let new_percent = if sample == 0 {
0u8
} else {
(sample * 20 / self.samples) as u8
};
let old_percent = percent.load(Ordering::Relaxed);
if new_percent > old_percent {
percent.store(new_percent, Ordering::Relaxed);
print!("\r{operation:?} {}%", new_percent * 5);
io::stdout().flush()?;
}
}
match operation {
BenchmarkOperation::Read => client.read(sample).await?,
BenchmarkOperation::Create => {
let record = record_provider.sample();
client.create(sample, record).await?;
}
BenchmarkOperation::Update => {
let record = record_provider.sample();
client.update(sample, record).await?;
}
BenchmarkOperation::Delete => client.delete(sample).await?,
}
}
client.shutdown().await?;
info!("Thread #{thread_number}/{operation:?} ends");
Ok::<(), anyhow::Error>(())
}) {
error!("{}", e);
error.store(true, Ordering::Relaxed);
}
});

let current = Arc::new(AtomicI32::new(0));

let mut futures = Vec::with_capacity(self.threads);

// start the threads
for thread_number in 0..self.threads {
let current = current.clone();
let error = error.clone();
let percent = percent.clone();
let samples = self.samples;
let client = engine.create_client(self.endpoint.clone()).await?;
let f = task::spawn(async move {
info!("Thread #{thread_number}/{operation:?} starts");
if let Err(e) =
Self::operation_loop(client, samples, &error, &current, &percent, operation)
.await
{
error!("{e}");
error.store(true, Ordering::Relaxed);
}
info!("Thread #{thread_number}/{operation:?} ends");
});
futures.push(f);
}

// Wait for threads to be done
for f in futures {
if let Err(e) = f.await {
{
error!("{e}");
error.store(true, Ordering::Relaxed);
}
}
});
println!("\r{operation:?} 100%");
io::stdout().flush()?;
}

if error.load(Ordering::Relaxed) {
bail!("Benchmark error");
}
println!("\r{operation:?} 100%");
io::stdout().flush()?;
Ok(time.elapsed())
}

async fn operation_loop<C>(
mut client: C,
samples: i32,
error: &AtomicBool,
current: &AtomicI32,
percent: &AtomicU8,
operation: BenchmarkOperation,
) -> Result<()>
where
C: BenchmarkClient,
{
let mut record_provider = RecordProvider::default();
while !error.load(Ordering::Relaxed) {
let sample = current.fetch_add(1, Ordering::Relaxed);
if sample >= samples {
break;
}
// Calculate and print out the percents
{
let new_percent = if sample == 0 {
0u8
} else {
(sample * 20 / samples) as u8
};
let old_percent = percent.load(Ordering::Relaxed);
if new_percent > old_percent {
percent.store(new_percent, Ordering::Relaxed);
print!("\r{operation:?} {}%", new_percent * 5);
io::stdout().flush()?;
}
}
match operation {
BenchmarkOperation::Read => client.read(sample).await?,
BenchmarkOperation::Create => {
let record = record_provider.sample();
client.create(sample, record).await?;
}
BenchmarkOperation::Update => {
let record = record_provider.sample();
client.update(sample, record).await?;
}
BenchmarkOperation::Delete => client.delete(sample).await?,
}
}
client.shutdown().await?;
Ok::<(), anyhow::Error>(())
}
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -222,28 +250,28 @@ impl RecordProvider {
}
}

pub(crate) trait BenchmarkEngine<C>
pub(crate) trait BenchmarkEngine<C>: Send + Sync
where
C: BenchmarkClient,
{
async fn create_client(&self, endpoint: Option<String>) -> Result<C>;
fn create_client(&self, endpoint: Option<String>) -> impl Future<Output = Result<C>> + Send;
}

pub(crate) trait BenchmarkClient {
pub(crate) trait BenchmarkClient: Send + 'static {
/// Initialise the store at startup
async fn startup(&mut self) -> Result<()> {
Ok(())
}
/// Cleanup the store at shutdown
async fn shutdown(&mut self) -> Result<()> {
Ok(())
fn shutdown(&mut self) -> impl Future<Output = Result<()>> + Send {
async { Ok(()) }
}
/// Create a record at a key
async fn create(&mut self, key: i32, record: &Record) -> Result<()>;
fn create(&mut self, key: i32, record: &Record) -> impl Future<Output = Result<()>> + Send;
/// Read a record at a key
async fn read(&mut self, key: i32) -> Result<()>;
fn read(&mut self, key: i32) -> impl Future<Output = Result<()>> + Send;
/// Update a record at a key
async fn update(&mut self, key: i32, record: &Record) -> Result<()>;
fn update(&mut self, key: i32, record: &Record) -> impl Future<Output = Result<()>> + Send;
/// Delete a record at a key
async fn delete(&mut self, key: i32) -> Result<()>;
fn delete(&mut self, key: i32) -> impl Future<Output = Result<()>> + Send;
}
Loading