From 774343e5e00e5d5103128c6e9e5ba41dd5ea9fca Mon Sep 17 00:00:00 2001 From: Hugo Larcher Date: Tue, 17 Sep 2024 14:10:58 +0200 Subject: [PATCH] fix: Stop adding requests to results when duration is exceeded --- README.md | 9 ++-- plot.py | 1 + src/benchmark.rs | 121 +--------------------------------------------- src/executors.rs | 8 +++- src/lib.rs | 11 +++-- src/requests.rs | 21 +++++++- src/results.rs | 4 ++ src/scheduler.rs | 44 ++++++++--------- src/writers.rs | 122 +++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 186 insertions(+), 155 deletions(-) create mode 100644 src/writers.rs diff --git a/README.md b/README.md index 170d0e4..ac97680 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,9 @@ Benchmarks using constant arrival rate or constant virtual user count. ![ui.png](assets%2Fui.png) ## TODO -- [ ] Customizable token count and variance +- [X] Customizable token count and variance - [ ] Check results -- [ ] Allow for multiturn prompts for speculation +- [X] Allow for multiturn prompts for prefix caching - [ ] Push results to Optimum benchmark backend - [ ] Script to generate plots from results @@ -27,7 +27,10 @@ $ docker run \ --tokenizer-name "Qwen/Qwen2-7B" \ --max-vus 800 \ --url http:/localhost:8080 \ - --warmup 20s + --warmup 20s \ + --num-rates 10 \ + --prompt-options "num_tokens=50,max_tokens=60,min_tokens=40,variance=10" \ + --decode-options "num_tokens=50,max_tokens=60,min_tokens=40,variance=10" ``` Results will be saved in `results.json` in current directory. \ No newline at end of file diff --git a/plot.py b/plot.py index cbcc9b9..8a88771 100644 --- a/plot.py +++ b/plot.py @@ -52,6 +52,7 @@ def plot_inner(x_name, x_values, results, chart_title): ax.tick_params(axis='x', rotation=0) ax.set_ylabel(label) ax.set_xlabel(x_name) + ax.set_ylim(0) # rotate x-axis labels for better readability plt.setp(ax.xaxis.get_majorticklabels(), rotation=-90) diff --git a/src/benchmark.rs b/src/benchmark.rs index 7a462fc..787373d 100644 --- a/src/benchmark.rs +++ b/src/benchmark.rs @@ -1,9 +1,7 @@ -use std::sync::{Arc}; +use std::sync::Arc; use std::time::Duration; use log::{debug, info}; use serde::Serialize; -use sysinfo::{CpuRefreshKind, MemoryRefreshKind, System}; -use tokio::fs; use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::sync::mpsc::{Receiver, Sender}; use crate::requests::{TextGenerationBackend, TextRequestGenerator, TokenizeOptions}; @@ -354,120 +352,3 @@ impl Benchmark { } -#[derive(Serialize)] -pub struct BenchmarkResultsWriter { - id: String, - executor_type: String, - config: executors::ExecutorConfig, - total_requests: u64, - total_tokens: u64, - token_throughput_secs: f64, - duration_ms: u128, - time_to_first_token_ms_avg: u128, - time_to_first_token_ms_p90: u128, - time_to_first_token_ms_p95: u128, - inter_token_latency_ms_avg: u128, - inter_token_latency_ms_p90: u128, - inter_token_latency_ms_p95: u128, - failed_requests: u64, - successful_requests: u64, - request_rate: f64, - total_tokens_sent: u64, - e2e_latency_ms_avg: u128, - e2e_latency_ms_p90: u128, - e2e_latency_ms_p95: u128, -} - -impl BenchmarkResultsWriter { - pub fn new(results: BenchmarkResults) -> anyhow::Result { - Ok(BenchmarkResultsWriter { - id: results.id.clone(), - executor_type: results.executor_type().to_string(), - config: results.executor_config(), - total_requests: results.total_requests() as u64, - total_tokens: results.total_tokens() as u64, - token_throughput_secs: results.token_throughput_secs()?, - duration_ms: results.duration().ok().unwrap().as_millis(), - time_to_first_token_ms_avg: results.time_to_first_token_avg().ok().unwrap().as_millis(), - time_to_first_token_ms_p90: results.time_to_first_token_percentile(0.9)?.as_millis(), - time_to_first_token_ms_p95: results.time_to_first_token_percentile(0.95)?.as_millis(), - inter_token_latency_ms_avg: results.inter_token_latency_avg().ok().unwrap().as_millis(), - inter_token_latency_ms_p90: results.inter_token_latency_percentile(0.9)?.as_millis(), - inter_token_latency_ms_p95: results.inter_token_latency_percentile(0.95)?.as_millis(), - failed_requests: results.failed_requests() as u64, - successful_requests: results.successful_requests() as u64, - request_rate: results.successful_request_rate()?, - total_tokens_sent: results.total_tokens_sent(), - e2e_latency_ms_avg: results.e2e_latency_avg().ok().unwrap().as_millis(), - e2e_latency_ms_p90: results.e2e_latency_percentile(0.9)?.as_millis(), - e2e_latency_ms_p95: results.e2e_latency_percentile(0.95)?.as_millis(), - }) - } -} - -#[derive(Serialize)] -pub struct SystemInfo { - pub cpu: Vec, - pub memory: String, - pub os_name: String, - pub os_version: String, - pub kernel: String, - pub hostname: String, -} - -impl SystemInfo { - pub fn new() -> SystemInfo { - let s = System::new_with_specifics( - sysinfo::RefreshKind::new() - .with_memory(MemoryRefreshKind::everything()) - .with_cpu(CpuRefreshKind::everything()) - ); - let cpu_info = s.cpus().iter().map(|cpu| format!("{} {}@{:.0}MHz", cpu.brand(), cpu.name(), cpu.frequency())).collect::>(); - SystemInfo { - cpu: cpu_info, - memory: format!("{:.2} GB", s.total_memory() as f64 / 1024.0 / 1024.0 / 1024.0), - os_name: System::name().ok_or("N/A").unwrap(), - os_version: System::os_version().ok_or("N/A").unwrap(), - kernel: System::kernel_version().ok_or("N/A").unwrap(), - hostname: System::host_name().ok_or("N/A").unwrap(), - } - } -} - -#[derive(Serialize)] -pub struct BenchmarkReportWriter { - config: BenchmarkConfig, - results: Vec, - start_time: String, - end_time: String, - system: SystemInfo, -} - - -impl BenchmarkReportWriter { - pub fn new(config: BenchmarkConfig, report: BenchmarkReport) -> anyhow::Result { - let mut results: Vec = Vec::new(); - for result in report.get_results() { - let writer = BenchmarkResultsWriter::new(result)?; - results.push(writer); - } - Ok(BenchmarkReportWriter { - config, - results, - start_time: report.start_time().ok_or(anyhow::anyhow!("start_time not set"))?.to_rfc3339(), - end_time: report.end_time().ok_or(anyhow::anyhow!("end_time not set"))?.to_rfc3339(), - system: SystemInfo::new(), - }) - } - pub async fn json(&self, path: &str) -> anyhow::Result<()> { - // write the benchmark report to json - let report = serde_json::to_string(&self).unwrap(); - let path = path.to_string(); - // create path - if !std::path::Path::new(&path).exists() { - fs::create_dir_all(&path).await?; - } - fs::write(format!("{}/results.json", path), report).await?; - Ok(()) - } -} \ No newline at end of file diff --git a/src/executors.rs b/src/executors.rs index 4390f9f..5fde3a3 100644 --- a/src/executors.rs +++ b/src/executors.rs @@ -68,6 +68,8 @@ impl Executor for ConstantVUsExecutor { while let Some(_) = end_rx.recv().await { active_vus.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); if start.elapsed() > self.config.duration{ + // signal that the VU work is done + let _ = responses_tx.send(TextGenerationAggregatedResponse::new_as_ended()); info!("Duration reached, waiting for all VUs to finish..."); if active_vus.load(std::sync::atomic::Ordering::SeqCst) == 0 { break; @@ -85,7 +87,7 @@ impl Executor for ConstantVUsExecutor { } async fn start_vu(backend: Box, request: Arc, responses_tx: UnboundedSender, end_tx: Sender, stop_sender: broadcast::Sender<()>) -> JoinHandle<()> { - let mut stop_receiver=stop_sender.subscribe(); + let mut stop_receiver = stop_sender.subscribe(); tokio::spawn(async move { tokio::select! { _ = stop_receiver.recv() => { @@ -178,7 +180,9 @@ impl Executor for ConstantArrivalRateExecutor { } interval.tick().await; } - drop(responses_tx); // drop response sender to signal VUs to stop + // signal that the VU work is done + info!("Duration reached, waiting for all VUs to finish..."); + let _ = responses_tx.send(TextGenerationAggregatedResponse::new_as_ended()); }=>{} } }); diff --git a/src/lib.rs b/src/lib.rs index 49305ad..a32b34d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,14 +4,14 @@ use std::sync::Arc; use chrono::Local; use log::{error, info, Level, LevelFilter}; -use tokio::sync::broadcast::{Sender}; +use tokio::sync::broadcast::Sender; use tokio::sync::Mutex; - +use writers::BenchmarkReportWriter; pub use crate::app::run_console; -use crate::benchmark::{BenchmarkReportWriter, Event, MessageEvent}; +use crate::benchmark::{Event, MessageEvent}; pub use crate::benchmark::{BenchmarkConfig, BenchmarkKind}; -use crate::requests::{OpenAITextGenerationBackend}; -pub use crate::requests::{TokenizeOptions}; +use crate::requests::OpenAITextGenerationBackend; +pub use crate::requests::TokenizeOptions; mod requests; mod executors; @@ -22,6 +22,7 @@ mod benchmark; mod app; mod event; mod flux; +mod writers; pub struct RunConfiguration { pub url: String, diff --git a/src/requests.rs b/src/requests.rs index ae3c6e6..09e33d7 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -91,7 +91,7 @@ impl OpenAITextGenerationBackend { impl TextGenerationBackend for OpenAITextGenerationBackend { async fn generate(&self, request: Arc, sender: Sender) { let url = format!("{base_url}/v1/chat/completions", base_url = self.base_url); - let mut aggregated_response = TextGenerationAggregatedResponse::new(); + let mut aggregated_response = TextGenerationAggregatedResponse::default(); //debug!("Requesting {url} with prompt: {prompt}, max tokens: {max_tokens}", prompt = request.prompt, max_tokens = request.max_tokens); let req = reqwest::Client::new().post(url) .header("Authorization", format!("Bearer {token}", token = self.api_key)) @@ -376,10 +376,26 @@ pub struct TextGenerationAggregatedResponse { pub times_to_tokens: Vec, last_received_token_time: std::time::Instant, pub failed: bool, + pub ended: bool, +} + +impl Default for TextGenerationAggregatedResponse { + fn default() -> Self { + Self { + start_time: None, + end_time: None, + num_generated_tokens: 0, + num_prompt_tokens: 0, + times_to_tokens: Vec::new(), + last_received_token_time: std::time::Instant::now(), + failed: false, + ended: false, + } + } } impl TextGenerationAggregatedResponse { - fn new() -> Self { + pub fn new_as_ended() -> Self { Self { start_time: None, end_time: None, @@ -388,6 +404,7 @@ impl TextGenerationAggregatedResponse { times_to_tokens: Vec::new(), last_received_token_time: std::time::Instant::now(), failed: false, + ended: true, } } fn start(&mut self, num_prompt_tokens: u64) { diff --git a/src/results.rs b/src/results.rs index 1909f4b..fdb85f2 100644 --- a/src/results.rs +++ b/src/results.rs @@ -183,6 +183,10 @@ impl BenchmarkResults { fn get_successful_responses(&self) -> Vec<&TextGenerationAggregatedResponse> { self.aggregated_responses.iter().filter(|response| !response.failed).collect() } + + pub fn get_responses(&self) -> Vec { + self.aggregated_responses.clone() + } } impl Debug for BenchmarkResults { diff --git a/src/scheduler.rs b/src/scheduler.rs index cdb4b7e..3cb9e7e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -2,11 +2,9 @@ use std::sync::Arc; use std::time::Instant; use log::{debug, info, trace, warn}; use tokio::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender}; -use tokio_stream::wrappers::UnboundedReceiverStream; use crate::executors::{ConstantArrivalRateExecutor, Executor, ExecutorConfig, ConstantVUsExecutor}; use crate::requests::{TextGenerationAggregatedResponse, TextGenerationBackend, TextRequestGenerator}; use crate::results::BenchmarkResults; -use futures_util::StreamExt; use tokio::sync::{broadcast, Mutex}; use crate::results::BenchmarkErrors::NoResponses; @@ -43,28 +41,28 @@ impl Scheduler { ) -> Scheduler { match executor_type { ExecutorType::ConstantVUs => { - return Scheduler { + Scheduler { id: id.clone(), executor: Arc::from(Mutex::from(ConstantVUsExecutor::new(backend.clone(), config.max_vus.clone(), config.duration.clone()))), results: Arc::from(Mutex::from(BenchmarkResults::new(id.clone(), ExecutorType::ConstantVUs, config))), requests_generator, progress_tx, stop_sender, - }; + } } ExecutorType::ConstantArrivalRate => { if config.rate.is_none() { panic!("Rate must be specified for ConstantArrivalRateExecutor"); } let rate = config.rate.unwrap(); - return Scheduler { + Scheduler { id: id.clone(), executor: Arc::from(Mutex::from(ConstantArrivalRateExecutor::new(backend.clone(), config.max_vus.clone(), config.duration.clone(), rate))), results: Arc::from(Mutex::from(BenchmarkResults::new(id.clone(), ExecutorType::ConstantArrivalRate, config))), requests_generator, progress_tx, - stop_sender: stop_sender, - }; + stop_sender, + } } } } @@ -72,8 +70,7 @@ impl Scheduler { pub async fn run(&mut self) -> anyhow::Result { debug!("Starting scheduler '{}'", self.id); // add responses to the benchmark result as they arrive - let (tx, rx): (UnboundedSender, UnboundedReceiver) = tokio::sync::mpsc::unbounded_channel(); - let rx = UnboundedReceiverStream::new(rx); + let (tx, mut rx): (UnboundedSender, UnboundedReceiver) = tokio::sync::mpsc::unbounded_channel(); let results = self.results.clone(); let progress_tx = self.progress_tx.clone(); let mut stop_receiver = self.stop_sender.subscribe(); @@ -84,23 +81,24 @@ impl Scheduler { return } _ = async{ - rx.for_each(|response| { + while let Some(response) = rx.recv().await{ let result = results.clone(); let progress_tx = progress_tx.clone(); - async move { - trace!("Received response: {:?}", response); - let mut result = result.lock().await; - result.add_response(response); - let expected_duration = result.executor_config().duration.as_secs_f64(); - let start_time = result.start_time().unwrap_or(Instant::now()); - let _ = progress_tx.send(Some(SchedulerProgress { - progress: (100.0 * (1.0 - (expected_duration - start_time.elapsed().as_secs_f64()) / expected_duration)).min(100.0), - requests_throughput: result.successful_request_rate().unwrap_or_default(), - successful_requests: result.successful_requests() as u64, - failed_requests: result.failed_requests() as u64, - })).await; + trace!("Received response: {:?}", response); + if response.ended { + return; } - }).await; + let mut result = result.lock().await; + result.add_response(response); + let expected_duration = result.executor_config().duration.as_secs_f64(); + let start_time = result.start_time().unwrap_or(Instant::now()); + let _ = progress_tx.send(Some(SchedulerProgress { + progress: (100.0 * (1.0 - (expected_duration - start_time.elapsed().as_secs_f64()) / expected_duration)).min(100.0), + requests_throughput: result.successful_request_rate().unwrap_or_default(), + successful_requests: result.successful_requests() as u64, + failed_requests: result.failed_requests() as u64, + })).await; + } }=>{} } }); diff --git a/src/writers.rs b/src/writers.rs new file mode 100644 index 0000000..2c91149 --- /dev/null +++ b/src/writers.rs @@ -0,0 +1,122 @@ +use serde::Serialize; +use tokio::fs; +use sysinfo::{CpuRefreshKind, MemoryRefreshKind, System}; +use crate::{executors, BenchmarkConfig}; +use crate::results::{BenchmarkReport, BenchmarkResults}; + +#[derive(Serialize)] +pub struct BenchmarkResultsWriter { + id: String, + executor_type: String, + config: executors::ExecutorConfig, + total_requests: u64, + total_tokens: u64, + token_throughput_secs: f64, + duration_ms: u128, + time_to_first_token_ms_avg: u128, + time_to_first_token_ms_p90: u128, + time_to_first_token_ms_p95: u128, + inter_token_latency_ms_avg: u128, + inter_token_latency_ms_p90: u128, + inter_token_latency_ms_p95: u128, + failed_requests: u64, + successful_requests: u64, + request_rate: f64, + total_tokens_sent: u64, + e2e_latency_ms_avg: u128, + e2e_latency_ms_p90: u128, + e2e_latency_ms_p95: u128, +} + +impl BenchmarkResultsWriter { + pub fn new(results: BenchmarkResults) -> anyhow::Result { + Ok(BenchmarkResultsWriter { + id: results.id.clone(), + executor_type: results.executor_type().to_string(), + config: results.executor_config(), + total_requests: results.total_requests() as u64, + total_tokens: results.total_tokens() as u64, + token_throughput_secs: results.token_throughput_secs()?, + duration_ms: results.duration().ok().unwrap().as_millis(), + time_to_first_token_ms_avg: results.time_to_first_token_avg().ok().unwrap().as_millis(), + time_to_first_token_ms_p90: results.time_to_first_token_percentile(0.9)?.as_millis(), + time_to_first_token_ms_p95: results.time_to_first_token_percentile(0.95)?.as_millis(), + inter_token_latency_ms_avg: results.inter_token_latency_avg().ok().unwrap().as_millis(), + inter_token_latency_ms_p90: results.inter_token_latency_percentile(0.9)?.as_millis(), + inter_token_latency_ms_p95: results.inter_token_latency_percentile(0.95)?.as_millis(), + failed_requests: results.failed_requests() as u64, + successful_requests: results.successful_requests() as u64, + request_rate: results.successful_request_rate()?, + total_tokens_sent: results.total_tokens_sent(), + e2e_latency_ms_avg: results.e2e_latency_avg().ok().unwrap().as_millis(), + e2e_latency_ms_p90: results.e2e_latency_percentile(0.9)?.as_millis(), + e2e_latency_ms_p95: results.e2e_latency_percentile(0.95)?.as_millis(), + }) + } +} + +#[derive(Serialize)] +pub struct SystemInfo { + pub cpu: Vec, + pub memory: String, + pub os_name: String, + pub os_version: String, + pub kernel: String, + pub hostname: String, +} + +impl SystemInfo { + pub fn new() -> SystemInfo { + let s = System::new_with_specifics( + sysinfo::RefreshKind::new() + .with_memory(MemoryRefreshKind::everything()) + .with_cpu(CpuRefreshKind::everything()) + ); + let cpu_info = s.cpus().iter().map(|cpu| format!("{} {}@{:.0}MHz", cpu.brand(), cpu.name(), cpu.frequency())).collect::>(); + SystemInfo { + cpu: cpu_info, + memory: format!("{:.2} GB", s.total_memory() as f64 / 1024.0 / 1024.0 / 1024.0), + os_name: System::name().ok_or("N/A").unwrap(), + os_version: System::os_version().ok_or("N/A").unwrap(), + kernel: System::kernel_version().ok_or("N/A").unwrap(), + hostname: System::host_name().ok_or("N/A").unwrap(), + } + } +} + +#[derive(Serialize)] +pub struct BenchmarkReportWriter { + config: BenchmarkConfig, + results: Vec, + start_time: String, + end_time: String, + system: SystemInfo, +} + +impl BenchmarkReportWriter { + pub fn new(config: BenchmarkConfig, report: BenchmarkReport) -> anyhow::Result { + let mut results: Vec = Vec::new(); + for result in report.get_results() { + let writer = BenchmarkResultsWriter::new(result)?; + results.push(writer); + } + Ok(BenchmarkReportWriter { + config, + results, + start_time: report.start_time().ok_or(anyhow::anyhow!("start_time not set"))?.to_rfc3339(), + end_time: report.end_time().ok_or(anyhow::anyhow!("end_time not set"))?.to_rfc3339(), + system: SystemInfo::new(), + }) + } + pub async fn json(&self, path: &str) -> anyhow::Result<()> { + // write the benchmark report to json + let report = serde_json::to_string(&self).unwrap(); + let path = path.to_string(); + // create path + if !std::path::Path::new(&path).exists() { + fs::create_dir_all(&path).await?; + } + fs::write(format!("{}/results.json", path), report).await?; + Ok(()) + } +} \ No newline at end of file