Skip to content

Commit

Permalink
feat: Cleaner report, add system infos
Browse files Browse the repository at this point in the history
  • Loading branch information
Hugoch committed Sep 13, 2024
1 parent b6213de commit 959dc0d
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 25 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,5 @@ Cargo.lock

.idea
*.json
*.txt
*.txt
results
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ chrono = "0.4.38"
hf-hub = { version = "0.3.2", features = ["tokio"] }
indicatif = "0.17.8"
rayon = "1.10.0"
serde_with = "3.9.0"
sysinfo = "0.31.4"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Benchmarks using constant arrival rate or constant virtual user count.
![ui.png](assets%2Fui.png)

## TODO

- [ ] Customizable token count and variance
- [ ] Check results
- [ ] Allow for multiturn prompts for speculation
- [ ] Push results to Optimum benchmark backend
Expand Down
76 changes: 69 additions & 7 deletions src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::{Arc};
use std::time::Duration;
use log::{debug, info};
use serde::Serialize;
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, System};
use tokio::fs;
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -10,7 +11,7 @@ use crate::{executors, scheduler};
use crate::results::{BenchmarkReport, BenchmarkResults};
use crate::scheduler::{ExecutorType, SchedulerProgress};

#[derive(Clone, Debug, strum_macros::Display)]
#[derive(Clone, Debug, strum_macros::Display, Serialize)]
pub enum BenchmarkKind {
Throughput,
Sweep,
Expand Down Expand Up @@ -53,14 +54,22 @@ pub struct Benchmark {
stop_sender: broadcast::Sender<()>,
}

#[derive(Clone)]
#[serde_with::serde_as]
#[derive(Clone, Serialize)]
pub struct BenchmarkConfig {
pub max_vus: u64,
#[serde(rename = "duration_secs")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
pub duration: Duration,
pub benchmark_kind: BenchmarkKind,
#[serde(rename = "warmup_duration_secs")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
pub warmup_duration: Duration,
pub rate: Option<f64>,
pub num_rates: u64,
pub prompt_length: u64,
pub prompt_variance: u64,
pub decode_length: u64,
}

impl BenchmarkConfig {
Expand Down Expand Up @@ -120,6 +129,7 @@ impl Benchmark {

pub async fn run(&mut self) -> anyhow::Result<BenchmarkReport> {
self.start_time = Some(std::time::Instant::now());
self.report.start();
info!("Prewarming backend");
self.warmup().await?;
info!("Prewarm complete");
Expand All @@ -140,6 +150,7 @@ impl Benchmark {
timestamp: chrono::Utc::now(),
level: log::Level::Info,
}))?;
self.report.end();
Ok(self.report.clone())
}

Expand Down Expand Up @@ -362,6 +373,7 @@ pub struct BenchmarkResultsWriter {
failed_requests: u64,
successful_requests: u64,
request_rate: f64,
total_tokens_sent: u64,
}

impl BenchmarkResultsWriter {
Expand All @@ -383,24 +395,74 @@ impl BenchmarkResultsWriter {
failed_requests: results.failed_requests() as u64,
successful_requests: results.successful_requests() as u64,
request_rate: results.successful_request_rate()?,
total_tokens_sent: results.total_tokens_sent(),
})
}
}

pub struct BenchmarkReportWriter {}
#[derive(Serialize)]
pub struct SystemInfo {
pub cpu: Vec<String>,
pub memory: String,
pub os_name: String,
pub os_version: String,
pub kernel: String,
pub hostname: String,
}

impl SystemInfo {
pub fn new() -> SystemInfo {
let s = System::new_with_specifics(
sysinfo::RefreshKind::new()
.with_memory(MemoryRefreshKind::everything())
.with_cpu(CpuRefreshKind::everything())
);
let cpu_info = s.cpus().iter().map(|cpu| format!("{} {}@{:.0}MHz", cpu.brand(), cpu.name(), cpu.frequency())).collect::<Vec<String>>();
SystemInfo {
cpu: cpu_info,
memory: format!("{:.2} GB", s.total_memory() as f64 / 1024.0 / 1024.0 / 1024.0),
os_name: System::name().ok_or("N/A").unwrap(),
os_version: System::os_version().ok_or("N/A").unwrap(),
kernel: System::kernel_version().ok_or("N/A").unwrap(),
hostname: System::host_name().ok_or("N/A").unwrap(),
}
}
}

#[derive(Serialize)]
pub struct BenchmarkReportWriter {
config: BenchmarkConfig,
results: Vec<BenchmarkResultsWriter>,
start_time: String,
end_time: String,
system: SystemInfo,
}


impl BenchmarkReportWriter {
pub async fn json(report: BenchmarkReport, path: &str) -> anyhow::Result<()> {
// write the benchmark report to json
pub fn new(config: BenchmarkConfig, report: BenchmarkReport) -> anyhow::Result<BenchmarkReportWriter> {
let mut results: Vec<BenchmarkResultsWriter> = Vec::new();
for result in report.get_results() {
let writer = BenchmarkResultsWriter::new(result)?;
results.push(writer);
}
let report = serde_json::to_string(&results).unwrap();
Ok(BenchmarkReportWriter {
config,
results,
start_time: report.start_time().ok_or(anyhow::anyhow!("start_time not set"))?.to_rfc3339(),
end_time: report.end_time().ok_or(anyhow::anyhow!("end_time not set"))?.to_rfc3339(),
system: SystemInfo::new(),
})
}
pub async fn json(&self, path: &str) -> anyhow::Result<()> {
// write the benchmark report to json
let report = serde_json::to_string(&self).unwrap();
let path = path.to_string();
fs::write(path, report).await?;
// create path
if !std::path::Path::new(&path).exists() {
fs::create_dir_all(&path).await?;
}
fs::write(format!("{}/results.json", path), report).await?;
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ use tokio::task::JoinHandle;

use crate::requests::{TextGenerationAggregatedResponse, TextGenerationBackend, TextGenerationRequest, TextRequestGenerator};

#[serde_with::serde_as]
#[derive(Clone, Serialize)]
pub struct ExecutorConfig {
pub max_vus: u64,
#[serde(rename = "duration_secs")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
pub duration: Duration,
pub rate: Option<f64>,
}
Expand Down
19 changes: 14 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ mod flux;

pub async fn run(url: String,
tokenizer_name: String,
prompt_length: u64,
prompt_variance: u64,
decode_length: u64,
max_vus: u64,
duration: std::time::Duration,
rate: Option<f64>,
Expand All @@ -34,6 +37,8 @@ pub async fn run(url: String,
stop_sender: Sender<()>,
) -> anyhow::Result<()> {
info!("Starting benchmark");
// set process system limits
sysinfo::set_open_files_limit(0);
// let backend = OpenAITextGenerationBackend::new("".to_string(), "http://10.90.11.68:8000".to_string());
let backend = OpenAITextGenerationBackend::new("".to_string(), url, tokenizer_name.clone());

Expand All @@ -49,6 +54,9 @@ pub async fn run(url: String,
warmup_duration: prewarm_duration,
rate,
num_rates,
prompt_length,
prompt_variance,
decode_length,
};
config.validate()?;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
Expand Down Expand Up @@ -94,24 +102,25 @@ pub async fn run(url: String,

// download prompts dataset
info!("Downloading dataset");
let _ = tx.send(Event::Message(MessageEvent{
let _ = tx.send(Event::Message(MessageEvent {
message: "Downloading dataset".to_string(),
timestamp: chrono::Utc::now(),
level: Level::Info,
}));
let filepath = requests::ShareGPTTextRequestGenerator::download_dataset("hlarcher/share_gpt_small".to_string(), "share_gpt_filtered_small.json".to_string()).expect("Can't download dataset");
let requests = requests::ShareGPTTextRequestGenerator::new(filepath, tokenizer_name, 50, 10, 10, 10);
let requests = requests::ShareGPTTextRequestGenerator::new(filepath, tokenizer_name, prompt_length, 1, prompt_length * 2, prompt_variance);

let mut benchmark = benchmark::Benchmark::new(config, Box::new(backend), Arc::from(Mutex::from(requests)), tx.clone(), stop_sender.clone());
let mut benchmark = benchmark::Benchmark::new(config.clone(), Box::new(backend), Arc::from(Mutex::from(requests)), tx.clone(), stop_sender.clone());
let mut stop_receiver = stop_sender.subscribe();
tokio::select! {
report = benchmark.run() => {
match report {
Ok(results) => {
info!("Throughput is {requests_throughput} req/s",requests_throughput = results.get_results()[0].successful_request_rate().unwrap());
let report = benchmark.get_report();
let path = "results/results.json".to_string();
BenchmarkReportWriter::json(report, &path).await.unwrap();
let path = "results/".to_string();
let writer=BenchmarkReportWriter::new(config.clone(), report)?;
writer.json(&path).await?;
},
Err(e) => {
error!("Error running benchmark: {:?}", e.to_string());
Expand Down
13 changes: 13 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,18 @@ struct Args {
#[clap(default_value = "http://localhost:8000", short, long, env)]
#[arg(value_parser = parse_url)]
url: String,
/// Disable console UI
#[clap(short, long, env)]
no_console: bool,
/// Prompt token length
#[clap(default_value = "50", long, env)]
prompt_length: u64,
/// Variance of prompt token length following a normal distribution
#[clap(default_value = "10", long, env)]
prompt_variance: u64,
/// Decode token length (number of tokens to generate)
#[clap(default_value = "10", long, env)]
decode_length: u64,
}

fn parse_duration(s: &str) -> Result<Duration, Error> {
Expand Down Expand Up @@ -69,6 +79,9 @@ async fn main() {
let main_thread = tokio::spawn(async move {
match run(args.url,
args.tokenizer_name,
args.prompt_length,
args.prompt_variance,
args.decode_length,
args.max_vus,
args.duration,
args.rate,
Expand Down
23 changes: 14 additions & 9 deletions src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
pub struct TextGenerationRequest {
pub prompt: String,
pub max_tokens: u32,
pub num_tokens: u64,
pub max_tokens: u64,
}

#[async_trait]
Expand Down Expand Up @@ -106,7 +107,7 @@ impl TextGenerationBackend for OpenAITextGenerationBackend {
"stream": true,
}));
// start timer
aggregated_response.start();
aggregated_response.start(request.num_tokens);
let mut es = EventSource::new(req).unwrap();
let mut final_response = "".to_string();
while let Some(event) = es.next().await {
Expand Down Expand Up @@ -180,7 +181,7 @@ pub struct ShareGPTEntry {
}

impl ShareGPTTextRequestGenerator {
pub fn new(filepath: PathBuf, tokenizer: String, prompt_tokens: u32, min_tokens: u32, max_tokens: u32, variance: u32) -> Self {
pub fn new(filepath: PathBuf, tokenizer: String, prompt_tokens: u64, min_tokens: u64, max_tokens: u64, variance: u64) -> Self {
let tokenizer = Arc::new(Tokenizer::from_pretrained(tokenizer, None).expect("Unable to load tokenizer"));
// load json file
let input = std::fs::read_to_string(&filepath).expect("Unable to read input file");
Expand All @@ -200,7 +201,7 @@ impl ShareGPTTextRequestGenerator {
let prompt = entry.conversations[0].value.clone();
// compute number of tokens to generate using a Gaussian distribution
let normal = rand_distr::Normal::new(prompt_tokens as f64, variance as f64).unwrap();
let mut num_tokens = normal.sample(&mut rand::thread_rng()) as u32;
let mut num_tokens = normal.sample(&mut rand::thread_rng()) as u64;
if num_tokens < min_tokens {
num_tokens = min_tokens;
}
Expand All @@ -216,6 +217,7 @@ impl ShareGPTTextRequestGenerator {
};
requests.lock().unwrap().push(TextGenerationRequest {
prompt: sampled_prompt,
num_tokens,
max_tokens,
});
// TODO: check that we have enough requests
Expand Down Expand Up @@ -259,14 +261,14 @@ impl TextRequestGenerator for ShareGPTTextRequestGenerator {
}


fn tokenize_prompt(prompt: String, tokenizer: Arc<Tokenizer>, num_tokens: u32) -> anyhow::Result<String> {
fn tokenize_prompt(prompt: String, tokenizer: Arc<Tokenizer>, num_tokens: u64) -> anyhow::Result<String> {
let prompt_tokens = tokenizer.encode(prompt.clone(), false).map_err(|_| anyhow::anyhow!("Error tokenizing prompt"))?;
if prompt_tokens.len() < num_tokens as usize {
return Err(anyhow::anyhow!("Prompt is too short to tokenize"));
}
// let's do a binary search to find the right number of tokens
let mut low = 1;
let mut high = prompt.len() as u32;
let mut high = prompt.len() as u64;
let mut prompt_sub = String::new();
while low < high {
let mid = (low + high) / 2;
Expand All @@ -293,7 +295,8 @@ fn tokenize_prompt(prompt: String, tokenizer: Arc<Tokenizer>, num_tokens: u32) -
pub struct TextGenerationAggregatedResponse {
pub start_time: Option<std::time::Instant>,
pub end_time: Option<std::time::Instant>,
pub num_generated_tokens: u32,
pub num_generated_tokens: u64,
pub num_prompt_tokens: u64,
pub times_to_tokens: Vec<std::time::Duration>,
last_received_token_time: std::time::Instant,
pub failed: bool,
Expand All @@ -305,14 +308,16 @@ impl TextGenerationAggregatedResponse {
start_time: None,
end_time: None,
num_generated_tokens: 0,
num_prompt_tokens: 0,
times_to_tokens: Vec::new(),
last_received_token_time: std::time::Instant::now(),
failed: false,
}
}
fn start(&mut self) {
fn start(&mut self, num_prompt_tokens:u64) {
self.start_time = Some(std::time::Instant::now());
self.last_received_token_time = std::time::Instant::now();
self.num_prompt_tokens = num_prompt_tokens;
}

fn stop(&mut self) {
Expand All @@ -324,7 +329,7 @@ impl TextGenerationAggregatedResponse {
self.failed = true;
}

fn add_tokens(&mut self, num_tokens: u32) {
fn add_tokens(&mut self, num_tokens: u64) {
self.num_generated_tokens += num_tokens;
let time_to_generate = self.last_received_token_time.elapsed();
self.last_received_token_time = std::time::Instant::now();
Expand Down
Loading

0 comments on commit 959dc0d

Please sign in to comment.