Skip to content

Commit

Permalink
fix: Stop adding requests to results when duration is exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
Hugoch committed Sep 17, 2024
1 parent 6f7f9d1 commit 774343e
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 155 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ Benchmarks using constant arrival rate or constant virtual user count.
![ui.png](assets%2Fui.png)

## TODO
- [ ] Customizable token count and variance
- [X] Customizable token count and variance
- [ ] Check results
- [ ] Allow for multiturn prompts for speculation
- [X] Allow for multiturn prompts for prefix caching
- [ ] Push results to Optimum benchmark backend
- [ ] Script to generate plots from results

Expand All @@ -27,7 +27,10 @@ $ docker run \
--tokenizer-name "Qwen/Qwen2-7B" \
--max-vus 800 \
--url http:/localhost:8080 \
--warmup 20s
--warmup 20s \
--num-rates 10 \
--prompt-options "num_tokens=50,max_tokens=60,min_tokens=40,variance=10" \
--decode-options "num_tokens=50,max_tokens=60,min_tokens=40,variance=10"
```

Results will be saved in `results.json` in current directory.
1 change: 1 addition & 0 deletions plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def plot_inner(x_name, x_values, results, chart_title):
ax.tick_params(axis='x', rotation=0)
ax.set_ylabel(label)
ax.set_xlabel(x_name)
ax.set_ylim(0)
# rotate x-axis labels for better readability
plt.setp(ax.xaxis.get_majorticklabels(), rotation=-90)

Expand Down
121 changes: 1 addition & 120 deletions src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::sync::{Arc};
use std::sync::Arc;
use std::time::Duration;
use log::{debug, info};
use serde::Serialize;
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, System};
use tokio::fs;
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::sync::mpsc::{Receiver, Sender};
use crate::requests::{TextGenerationBackend, TextRequestGenerator, TokenizeOptions};
Expand Down Expand Up @@ -354,120 +352,3 @@ impl Benchmark {
}


#[derive(Serialize)]
pub struct BenchmarkResultsWriter {
id: String,
executor_type: String,
config: executors::ExecutorConfig,
total_requests: u64,
total_tokens: u64,
token_throughput_secs: f64,
duration_ms: u128,
time_to_first_token_ms_avg: u128,
time_to_first_token_ms_p90: u128,
time_to_first_token_ms_p95: u128,
inter_token_latency_ms_avg: u128,
inter_token_latency_ms_p90: u128,
inter_token_latency_ms_p95: u128,
failed_requests: u64,
successful_requests: u64,
request_rate: f64,
total_tokens_sent: u64,
e2e_latency_ms_avg: u128,
e2e_latency_ms_p90: u128,
e2e_latency_ms_p95: u128,
}

impl BenchmarkResultsWriter {
pub fn new(results: BenchmarkResults) -> anyhow::Result<BenchmarkResultsWriter> {
Ok(BenchmarkResultsWriter {
id: results.id.clone(),
executor_type: results.executor_type().to_string(),
config: results.executor_config(),
total_requests: results.total_requests() as u64,
total_tokens: results.total_tokens() as u64,
token_throughput_secs: results.token_throughput_secs()?,
duration_ms: results.duration().ok().unwrap().as_millis(),
time_to_first_token_ms_avg: results.time_to_first_token_avg().ok().unwrap().as_millis(),
time_to_first_token_ms_p90: results.time_to_first_token_percentile(0.9)?.as_millis(),
time_to_first_token_ms_p95: results.time_to_first_token_percentile(0.95)?.as_millis(),
inter_token_latency_ms_avg: results.inter_token_latency_avg().ok().unwrap().as_millis(),
inter_token_latency_ms_p90: results.inter_token_latency_percentile(0.9)?.as_millis(),
inter_token_latency_ms_p95: results.inter_token_latency_percentile(0.95)?.as_millis(),
failed_requests: results.failed_requests() as u64,
successful_requests: results.successful_requests() as u64,
request_rate: results.successful_request_rate()?,
total_tokens_sent: results.total_tokens_sent(),
e2e_latency_ms_avg: results.e2e_latency_avg().ok().unwrap().as_millis(),
e2e_latency_ms_p90: results.e2e_latency_percentile(0.9)?.as_millis(),
e2e_latency_ms_p95: results.e2e_latency_percentile(0.95)?.as_millis(),
})
}
}

#[derive(Serialize)]
pub struct SystemInfo {
pub cpu: Vec<String>,
pub memory: String,
pub os_name: String,
pub os_version: String,
pub kernel: String,
pub hostname: String,
}

impl SystemInfo {
pub fn new() -> SystemInfo {
let s = System::new_with_specifics(
sysinfo::RefreshKind::new()
.with_memory(MemoryRefreshKind::everything())
.with_cpu(CpuRefreshKind::everything())
);
let cpu_info = s.cpus().iter().map(|cpu| format!("{} {}@{:.0}MHz", cpu.brand(), cpu.name(), cpu.frequency())).collect::<Vec<String>>();
SystemInfo {
cpu: cpu_info,
memory: format!("{:.2} GB", s.total_memory() as f64 / 1024.0 / 1024.0 / 1024.0),
os_name: System::name().ok_or("N/A").unwrap(),
os_version: System::os_version().ok_or("N/A").unwrap(),
kernel: System::kernel_version().ok_or("N/A").unwrap(),
hostname: System::host_name().ok_or("N/A").unwrap(),
}
}
}

#[derive(Serialize)]
pub struct BenchmarkReportWriter {
config: BenchmarkConfig,
results: Vec<BenchmarkResultsWriter>,
start_time: String,
end_time: String,
system: SystemInfo,
}


impl BenchmarkReportWriter {
pub fn new(config: BenchmarkConfig, report: BenchmarkReport) -> anyhow::Result<BenchmarkReportWriter> {
let mut results: Vec<BenchmarkResultsWriter> = Vec::new();
for result in report.get_results() {
let writer = BenchmarkResultsWriter::new(result)?;
results.push(writer);
}
Ok(BenchmarkReportWriter {
config,
results,
start_time: report.start_time().ok_or(anyhow::anyhow!("start_time not set"))?.to_rfc3339(),
end_time: report.end_time().ok_or(anyhow::anyhow!("end_time not set"))?.to_rfc3339(),
system: SystemInfo::new(),
})
}
pub async fn json(&self, path: &str) -> anyhow::Result<()> {
// write the benchmark report to json
let report = serde_json::to_string(&self).unwrap();
let path = path.to_string();
// create path
if !std::path::Path::new(&path).exists() {
fs::create_dir_all(&path).await?;
}
fs::write(format!("{}/results.json", path), report).await?;
Ok(())
}
}
8 changes: 6 additions & 2 deletions src/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ impl Executor for ConstantVUsExecutor {
while let Some(_) = end_rx.recv().await {
active_vus.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
if start.elapsed() > self.config.duration{
// signal that the VU work is done
let _ = responses_tx.send(TextGenerationAggregatedResponse::new_as_ended());
info!("Duration reached, waiting for all VUs to finish...");
if active_vus.load(std::sync::atomic::Ordering::SeqCst) == 0 {
break;
Expand All @@ -85,7 +87,7 @@ impl Executor for ConstantVUsExecutor {
}

async fn start_vu(backend: Box<dyn TextGenerationBackend + Send + Sync>, request: Arc<TextGenerationRequest>, responses_tx: UnboundedSender<TextGenerationAggregatedResponse>, end_tx: Sender<bool>, stop_sender: broadcast::Sender<()>) -> JoinHandle<()> {
let mut stop_receiver=stop_sender.subscribe();
let mut stop_receiver = stop_sender.subscribe();
tokio::spawn(async move {
tokio::select! {
_ = stop_receiver.recv() => {
Expand Down Expand Up @@ -178,7 +180,9 @@ impl Executor for ConstantArrivalRateExecutor {
}
interval.tick().await;
}
drop(responses_tx); // drop response sender to signal VUs to stop
// signal that the VU work is done
info!("Duration reached, waiting for all VUs to finish...");
let _ = responses_tx.send(TextGenerationAggregatedResponse::new_as_ended());
}=>{}
}
});
Expand Down
11 changes: 6 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::sync::Arc;

use chrono::Local;
use log::{error, info, Level, LevelFilter};
use tokio::sync::broadcast::{Sender};
use tokio::sync::broadcast::Sender;
use tokio::sync::Mutex;

use writers::BenchmarkReportWriter;
pub use crate::app::run_console;
use crate::benchmark::{BenchmarkReportWriter, Event, MessageEvent};
use crate::benchmark::{Event, MessageEvent};
pub use crate::benchmark::{BenchmarkConfig, BenchmarkKind};
use crate::requests::{OpenAITextGenerationBackend};
pub use crate::requests::{TokenizeOptions};
use crate::requests::OpenAITextGenerationBackend;
pub use crate::requests::TokenizeOptions;

mod requests;
mod executors;
Expand All @@ -22,6 +22,7 @@ mod benchmark;
mod app;
mod event;
mod flux;
mod writers;

pub struct RunConfiguration {
pub url: String,
Expand Down
21 changes: 19 additions & 2 deletions src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl OpenAITextGenerationBackend {
impl TextGenerationBackend for OpenAITextGenerationBackend {
async fn generate(&self, request: Arc<TextGenerationRequest>, sender: Sender<TextGenerationAggregatedResponse>) {
let url = format!("{base_url}/v1/chat/completions", base_url = self.base_url);
let mut aggregated_response = TextGenerationAggregatedResponse::new();
let mut aggregated_response = TextGenerationAggregatedResponse::default();
//debug!("Requesting {url} with prompt: {prompt}, max tokens: {max_tokens}", prompt = request.prompt, max_tokens = request.max_tokens);
let req = reqwest::Client::new().post(url)
.header("Authorization", format!("Bearer {token}", token = self.api_key))
Expand Down Expand Up @@ -376,10 +376,26 @@ pub struct TextGenerationAggregatedResponse {
pub times_to_tokens: Vec<std::time::Duration>,
last_received_token_time: std::time::Instant,
pub failed: bool,
pub ended: bool,
}

impl Default for TextGenerationAggregatedResponse {
fn default() -> Self {
Self {
start_time: None,
end_time: None,
num_generated_tokens: 0,
num_prompt_tokens: 0,
times_to_tokens: Vec::new(),
last_received_token_time: std::time::Instant::now(),
failed: false,
ended: false,
}
}
}

impl TextGenerationAggregatedResponse {
fn new() -> Self {
pub fn new_as_ended() -> Self {
Self {
start_time: None,
end_time: None,
Expand All @@ -388,6 +404,7 @@ impl TextGenerationAggregatedResponse {
times_to_tokens: Vec::new(),
last_received_token_time: std::time::Instant::now(),
failed: false,
ended: true,
}
}
fn start(&mut self, num_prompt_tokens: u64) {
Expand Down
4 changes: 4 additions & 0 deletions src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ impl BenchmarkResults {
fn get_successful_responses(&self) -> Vec<&TextGenerationAggregatedResponse> {
self.aggregated_responses.iter().filter(|response| !response.failed).collect()
}

pub fn get_responses(&self) -> Vec<TextGenerationAggregatedResponse> {
self.aggregated_responses.clone()
}
}

impl Debug for BenchmarkResults {
Expand Down
44 changes: 21 additions & 23 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ use std::sync::Arc;
use std::time::Instant;
use log::{debug, info, trace, warn};
use tokio::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::executors::{ConstantArrivalRateExecutor, Executor, ExecutorConfig, ConstantVUsExecutor};
use crate::requests::{TextGenerationAggregatedResponse, TextGenerationBackend, TextRequestGenerator};
use crate::results::BenchmarkResults;
use futures_util::StreamExt;
use tokio::sync::{broadcast, Mutex};
use crate::results::BenchmarkErrors::NoResponses;

Expand Down Expand Up @@ -43,37 +41,36 @@ impl Scheduler {
) -> Scheduler {
match executor_type {
ExecutorType::ConstantVUs => {
return Scheduler {
Scheduler {
id: id.clone(),
executor: Arc::from(Mutex::from(ConstantVUsExecutor::new(backend.clone(), config.max_vus.clone(), config.duration.clone()))),
results: Arc::from(Mutex::from(BenchmarkResults::new(id.clone(), ExecutorType::ConstantVUs, config))),
requests_generator,
progress_tx,
stop_sender,
};
}
}
ExecutorType::ConstantArrivalRate => {
if config.rate.is_none() {
panic!("Rate must be specified for ConstantArrivalRateExecutor");
}
let rate = config.rate.unwrap();
return Scheduler {
Scheduler {
id: id.clone(),
executor: Arc::from(Mutex::from(ConstantArrivalRateExecutor::new(backend.clone(), config.max_vus.clone(), config.duration.clone(), rate))),
results: Arc::from(Mutex::from(BenchmarkResults::new(id.clone(), ExecutorType::ConstantArrivalRate, config))),
requests_generator,
progress_tx,
stop_sender: stop_sender,
};
stop_sender,
}
}
}
}

pub async fn run(&mut self) -> anyhow::Result<BenchmarkResults> {
debug!("Starting scheduler '{}'", self.id);
// add responses to the benchmark result as they arrive
let (tx, rx): (UnboundedSender<TextGenerationAggregatedResponse>, UnboundedReceiver<TextGenerationAggregatedResponse>) = tokio::sync::mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
let (tx, mut rx): (UnboundedSender<TextGenerationAggregatedResponse>, UnboundedReceiver<TextGenerationAggregatedResponse>) = tokio::sync::mpsc::unbounded_channel();
let results = self.results.clone();
let progress_tx = self.progress_tx.clone();
let mut stop_receiver = self.stop_sender.subscribe();
Expand All @@ -84,23 +81,24 @@ impl Scheduler {
return
}
_ = async{
rx.for_each(|response| {
while let Some(response) = rx.recv().await{
let result = results.clone();
let progress_tx = progress_tx.clone();
async move {
trace!("Received response: {:?}", response);
let mut result = result.lock().await;
result.add_response(response);
let expected_duration = result.executor_config().duration.as_secs_f64();
let start_time = result.start_time().unwrap_or(Instant::now());
let _ = progress_tx.send(Some(SchedulerProgress {
progress: (100.0 * (1.0 - (expected_duration - start_time.elapsed().as_secs_f64()) / expected_duration)).min(100.0),
requests_throughput: result.successful_request_rate().unwrap_or_default(),
successful_requests: result.successful_requests() as u64,
failed_requests: result.failed_requests() as u64,
})).await;
trace!("Received response: {:?}", response);
if response.ended {
return;
}
}).await;
let mut result = result.lock().await;
result.add_response(response);
let expected_duration = result.executor_config().duration.as_secs_f64();
let start_time = result.start_time().unwrap_or(Instant::now());
let _ = progress_tx.send(Some(SchedulerProgress {
progress: (100.0 * (1.0 - (expected_duration - start_time.elapsed().as_secs_f64()) / expected_duration)).min(100.0),
requests_throughput: result.successful_request_rate().unwrap_or_default(),
successful_requests: result.successful_requests() as u64,
failed_requests: result.failed_requests() as u64,
})).await;
}
}=>{}
}
});
Expand Down
Loading

0 comments on commit 774343e

Please sign in to comment.