diff --git a/README.md b/README.md index 6de6178..1f6e5f4 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,27 @@ workloads: Poisson process. Every workload is executed via set of worker processes, that are distributed -among available system CPU cores. +among available system CPU cores to fully utilize system resources. + +# How to contribute + +* Make sure you've got recent enough version of Rust compiler. At the moment + the minimal required version is 1.71 . + +* Build project either directly using `cargo build`, or using containerized + version implemented as a make `all` target. + +* Find out what you need to change. The rule of thumb: if it has something to + do with a specific workload type, it goes into a corresponding workload + module under the `worker` directory; if it's a general improvement, feel free + to modify anything outside workers. + +* Do all sorts of hacking. Use `RUST_LOG=info` environment variable for getting + verbose output when troubleshooting. + +* Make sure tests are passing, `cargo test`. + +* Run linter, `cargo clippy`. \[1\]: https://en.wikipedia.org/wiki/Poisson_point_process diff --git a/src/lib.rs b/src/lib.rs index 551a739..7aa1bd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,37 +1,96 @@ use serde::Deserialize; +use std::fmt::Display; +use core_affinity::CoreId; pub mod worker; +/// Main workload configuration, contains general bits for all types of +/// workloads plus workload specific data. #[derive(Debug, Copy, Clone, Deserialize)] -#[serde(tag = "distribution")] -pub enum Distribution { - #[serde(alias = "zipf")] - Zipfian { n_ports: u64, exponent: f64 }, - #[serde(alias = "uniform")] - Uniform { lower: u64, upper: u64 }, +pub struct WorkloadConfig { + + /// An amount of time for workload payload to run before restarting. + pub restart_interval: u64, + + /// Custom workload configuration. + pub workload: Workload, } +/// Workload specific configuration, contains one enum value for each +/// workload type. #[derive(Debug, Copy, Clone, Deserialize)] #[serde(rename_all = "lowercase", tag = "type")] pub enum Workload { + + /// How to listen on ports. Endpoints { + + /// Governing the number of ports open. #[serde(flatten)] distribution: Distribution, }, + + /// How to spawn processes. Processes { + + /// How often a new process will be spawn. arrival_rate: f64, + + /// How long processes are going to live. departure_rate: f64, + + /// Spawn a new process with random arguments. random_process: bool, }, + + /// How to invoke syscalls Syscalls { + + /// How often to invoke a syscall. arrival_rate: f64, }, } +/// Distribution for number of ports to listen on #[derive(Debug, Copy, Clone, Deserialize)] -pub struct WorkloadConfig { - pub restart_interval: u64, - pub workload: Workload, +#[serde(tag = "distribution")] +pub enum Distribution { + + /// Few processes are opening large number of ports, the rest are only few. + #[serde(alias = "zipf")] + Zipfian { n_ports: u64, exponent: f64 }, + + /// Every process opens more or less the same number of ports. + #[serde(alias = "uniform")] + Uniform { lower: u64, upper: u64 }, +} + +#[derive(Debug)] +pub enum WorkerError {} + +impl Display for WorkerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "worker error found") + } +} + +/// Generic interface for workers of any type +pub trait Worker { + fn run_payload(&self) -> Result<(), WorkerError>; +} + +/// General information for each worker, on which CPU is it running +/// and what is the process number. +#[derive(Debug, Copy, Clone)] +struct BaseConfig { + cpu: CoreId, + process: usize, +} + +impl Display for BaseConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Process {} from {}", self.process, self.cpu.id) + } } #[cfg(test)] diff --git a/src/main.rs b/src/main.rs index 2adb93b..8fd1ec0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,18 @@ +//! Berserker workload generator. +//! +//! The implementation is covering two part: +//! * workload independent logic +//! * workload specific details +//! +//! Those have to be isolated as much as possible, and working together via +//! configuration data structures and worker interface. +//! +//! The execution contains following steps: +//! * Consume provided configuration +//! * For each available CPU core spawn specified number of worker processes +//! * Invoke a workload-specific logic via run_payload +//! * Wait for all the workers to finish + #[macro_use] extern crate log; extern crate core_affinity; diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs index 78ee79d..5302d8d 100644 --- a/src/worker/endpoints.rs +++ b/src/worker/endpoints.rs @@ -3,9 +3,7 @@ use std::{fmt::Display, net::TcpListener, thread, time}; use core_affinity::CoreId; use log::info; -use crate::WorkloadConfig; - -use super::{BaseConfig, Worker, WorkerError}; +use crate::{WorkloadConfig, BaseConfig, Worker, WorkerError}; struct EndpointWorkload { restart_interval: u64, diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 9804626..0bb4e98 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,10 +1,8 @@ -use std::fmt::Display; - use core_affinity::CoreId; use rand::{thread_rng, Rng}; use rand_distr::{Uniform, Zipf}; -use crate::{Distribution, Workload, WorkloadConfig}; +use crate::{Distribution, Workload, WorkloadConfig, Worker}; use self::{endpoints::EndpointWorker, processes::ProcessesWorker, syscalls::SyscallsWorker}; @@ -12,31 +10,6 @@ pub mod endpoints; pub mod processes; pub mod syscalls; -#[derive(Debug)] -pub enum WorkerError {} - -impl Display for WorkerError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "worker error found") - } -} - -pub trait Worker { - fn run_payload(&self) -> Result<(), WorkerError>; -} - -#[derive(Debug, Copy, Clone)] -struct BaseConfig { - cpu: CoreId, - process: usize, -} - -impl Display for BaseConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Process {} from {}", self.process, self.cpu.id) - } -} - pub fn new_worker( workload: WorkloadConfig, cpu: CoreId, diff --git a/src/worker/processes.rs b/src/worker/processes.rs index 5270c0a..fca0e40 100644 --- a/src/worker/processes.rs +++ b/src/worker/processes.rs @@ -7,9 +7,7 @@ use nix::{sys::wait::waitpid, unistd::Pid}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rand_distr::Exp; -use crate::{Workload, WorkloadConfig}; - -use super::{BaseConfig, Worker, WorkerError}; +use crate::{Workload, WorkloadConfig, BaseConfig, Worker, WorkerError}; #[derive(Debug, Clone, Copy)] pub struct ProcessesWorker { @@ -67,7 +65,7 @@ impl ProcessesWorker { } impl Worker for ProcessesWorker { - fn run_payload(&self) -> Result<(), super::WorkerError> { + fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); let Workload::Processes { diff --git a/src/worker/syscalls.rs b/src/worker/syscalls.rs index fbc7c7a..bd708d3 100644 --- a/src/worker/syscalls.rs +++ b/src/worker/syscalls.rs @@ -6,9 +6,7 @@ use rand::{thread_rng, Rng}; use rand_distr::Exp; use syscalls::{syscall, Sysno}; -use crate::{Workload, WorkloadConfig}; - -use super::{BaseConfig, Worker}; +use crate::{Workload, WorkloadConfig, BaseConfig, Worker, WorkerError}; #[derive(Debug, Copy, Clone)] pub struct SyscallsWorker { @@ -36,7 +34,7 @@ impl SyscallsWorker { } impl Worker for SyscallsWorker { - fn run_payload(&self) -> Result<(), super::WorkerError> { + fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); let Workload::Syscalls { arrival_rate } = self.workload.workload else {