From 42bee1be0ebbe76299d0c02e45303a3143fbe140 Mon Sep 17 00:00:00 2001 From: "Jiaxiao (mossaka) Zhou" Date: Fri, 31 Jan 2025 03:05:51 +0000 Subject: [PATCH] feat(stress-test): add a new stress-test that runs container workload with Containerd Signed-off-by: Jiaxiao (mossaka) Zhou --- Cargo.lock | 16 +++ Cargo.toml | 3 +- crates/stress-test-c8d/Cargo.toml | 19 +++ crates/stress-test-c8d/README.md | 19 +++ crates/stress-test-c8d/src/main.rs | 214 +++++++++++++++++++++++++++++ 5 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 crates/stress-test-c8d/Cargo.toml create mode 100644 crates/stress-test-c8d/README.md create mode 100644 crates/stress-test-c8d/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 6dc66cb18..e3fdb9153 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5223,6 +5223,21 @@ dependencies = [ "trapeze", ] +[[package]] +name = "stress-test-c8d" +version = "0.4.0" +dependencies = [ + "anyhow", + "clap", + "containerd-client", + "env_logger", + "oci-spec", + "prost-types 0.13.4", + "serde_json", + "tokio", + "tonic 0.12.3", +] + [[package]] name = "strsim" version = "0.10.0" @@ -5518,6 +5533,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/Cargo.toml b/Cargo.toml index 8fa496743..20e730319 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,8 @@ members = [ "crates/containerd-shim-wasmer", "crates/containerd-shim-wamr", "crates/stress-test", - "benches/containerd-shim-benchmarks", + "crates/stress-test-c8d", + "benches/containerd-shim-benchmarks", ] resolver = "2" diff --git a/crates/stress-test-c8d/Cargo.toml b/crates/stress-test-c8d/Cargo.toml new file mode 100644 index 000000000..21624a3db --- /dev/null +++ b/crates/stress-test-c8d/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "stress-test-c8d" +edition.workspace = true +version.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +homepage.workspace = true + +[dependencies] +anyhow = { workspace = true } +prost-types = "0.13" +tokio = { version = "1.0", features = ["full"] } +tonic = "0.12" +oci-spec.workspace = true +clap = { version = "4", features = ["derive"] } +env_logger = { workspace = true } +containerd-client = "0.6.0" +serde_json.workspace = true diff --git a/crates/stress-test-c8d/README.md b/crates/stress-test-c8d/README.md new file mode 100644 index 000000000..22365dd39 --- /dev/null +++ b/crates/stress-test-c8d/README.md @@ -0,0 +1,19 @@ +# Shim stress test with containerd + +This crate provides a way to stress test the shim. + +## Getting started + +```bash +cargo run -p stress-test-c8d -- --help +``` + +Install wasmtime shim +```bash +make build-wasmtime & sudo make install-wasmtime +``` + +then stress test it +```bash +sudo cargo run -p stress-test-c8d +``` \ No newline at end of file diff --git a/crates/stress-test-c8d/src/main.rs b/crates/stress-test-c8d/src/main.rs new file mode 100644 index 000000000..c9d9e1af4 --- /dev/null +++ b/crates/stress-test-c8d/src/main.rs @@ -0,0 +1,214 @@ +use std::path::PathBuf; +use std::{sync::Arc, time::Instant}; +use clap::Parser; +use containerd_client as client; +use client::{ + services::v1::{ + container::Runtime, containers_client::ContainersClient, tasks_client::TasksClient, + Container, CreateContainerRequest, CreateTaskRequest, DeleteContainerRequest, + DeleteTaskRequest, StartRequest, WaitRequest, + }, + with_namespace, +}; +use oci_spec::runtime::{ProcessBuilder, SpecBuilder, UserBuilder}; +use prost_types::Any; +use tokio::sync::{mpsc, Semaphore}; +use anyhow::{Result, Context}; +use tonic::{Request, transport::Channel}; + +#[derive(Parser, Debug)] +#[command(name = "wasm-stress-test")] +#[command(about = "Stress test for WASM containers using containerd")] +struct Args { + #[arg(long, default_value_t = false)] + verbose: bool, + + #[arg(long, default_value_t = true)] + container_output: bool, + + #[arg(long, default_value_t = 32)] + parallel: usize, + + #[arg(long, default_value_t = 1000)] + count: usize, +} + +const SOCKET_PATH: &str = "/run/containerd/containerd.sock"; +const NAMESPACE: &str = "default"; +const RUNTIME_NAME: &str = "io.containerd.wasmtime.v1"; +const WASM_IMAGE: &str = "ghcr.io/containerd/runwasi/wasi-demo-oci:latest"; + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + if !args.verbose { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")) + .init(); + } + + let channel = client::connect(SOCKET_PATH) + .await + .context("Failed to connect to containerd")?; + + let max_parallel = if args.parallel == 0 { + args.count + } else { + args.parallel + }; + + let semaphore = Arc::new(Semaphore::new(max_parallel)); + let (error_tx, mut error_rx) = mpsc::channel(args.count); + let start_time = Instant::now(); + + let mut handles = Vec::new(); + + for i in 0..args.count { + let channel = channel.clone(); + let semaphore = semaphore.clone(); + let error_tx = error_tx.clone(); + + let handle = tokio::spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + + let container_id = format!("stress-test-{}-{}", i, std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos()); + + if let Err(e) = run_container( + channel.clone(), + &container_id, + ).await { + let _ = error_tx.send(format!( + "Failed to run container {}: {}", container_id, e + )).await; + } + }); + + handles.push(handle); + } + + for handle in handles { + let _ = handle.await; + } + drop(error_tx); + + let duration = start_time.elapsed(); + println!("\nStress test completed in {:?}", duration); + println!("Containers created: {}", args.count); + println!("Concurrent workers: {}", max_parallel); + + let mut error_count = 0; + while let Some(error) = error_rx.recv().await { + error_count += 1; + if args.verbose { + eprintln!("{}", error); + } + } + + println!("\nTotal errors: {}", error_count); + println!("Success rate: {:.2}%", + (args.count as f64 - error_count as f64) / args.count as f64 * 100.0); + + Ok(()) +} + +async fn run_container( + channel: Channel, + container_id: &str, +) -> Result<()> { + let mut containers_client = ContainersClient::new(channel.clone()); + let process = ProcessBuilder::default() + .user(UserBuilder::default().build().unwrap()) + .args(vec!["wasi-demo-oci.wasm".into(), "echo".into(), "hello".into()]) + .cwd(PathBuf::from("/")) + .build() + .unwrap(); + + let spec = SpecBuilder::default() + .version("1.1.0") + .process(process) + .build() + .unwrap(); + + let spec: Any = Any { + type_url: "types.containerd.io/opencontainers/runtime-spec/1/Spec".to_string(), + value: serde_json::to_vec(&spec).unwrap(), + }; + + let container = Container { + id: container_id.to_string(), + image: WASM_IMAGE.to_string(), + runtime: Some(Runtime { + name: RUNTIME_NAME.to_string(), + options: None, + }), + spec: Some(spec), + ..Default::default() + }; + + let req = CreateContainerRequest { + container: Some(container), + }; + let req = with_namespace!(req, NAMESPACE); + let _resp = containers_client + .create(req) + .await + .expect("Failed to create container"); + + let mut client = TasksClient::new(channel.clone()); + + let req = CreateTaskRequest { + container_id: container_id.to_string(), + ..Default::default() + }; + let req = with_namespace!(req, NAMESPACE); + + let _resp = client.create(req).await.expect("Failed to create task"); + + let req = StartRequest { + container_id: container_id.to_string(), + ..Default::default() + }; + let req = with_namespace!(req, NAMESPACE); + + match client.start(req).await { + Ok(_) => Ok(()), + Err(e) => { + let cleanup_req = Request::new(DeleteTaskRequest { + container_id: container_id.to_string(), + }); + let _ = client.delete(cleanup_req).await; + let cleanup_req = Request::new(DeleteContainerRequest { + id: container_id.to_string(), + }); + let _ = containers_client.delete(cleanup_req).await; + Err(anyhow::anyhow!("Failed to start container: {}", e)) + } + }?; + + let req = WaitRequest { + container_id: container_id.to_string(), + ..Default::default() + }; + let req = with_namespace!(req, NAMESPACE); + + client.wait(req).await?; + + let req = DeleteTaskRequest { + container_id: container_id.to_string(), + }; + let req = with_namespace!(req, NAMESPACE); + + client.delete(req).await?; + + let req = DeleteContainerRequest { + id: container_id.to_string(), + }; + let req = with_namespace!(req, NAMESPACE); + + containers_client.delete(req).await?; + + Ok(()) +} \ No newline at end of file