From 0210290226ae5c1550ad2491d3ee677f9e4b1afd Mon Sep 17 00:00:00 2001 From: Samika Kashyap Date: Wed, 17 Jul 2024 15:29:40 -0700 Subject: [PATCH 1/5] feat: operator to create a separate job for load generation --- Cargo.lock | 1 + operator/src/crdgen.rs | 6 +- operator/src/lgen/controller.rs | 200 ++++++++++++++++++++++++++ operator/src/lgen/job.rs | 183 +++++++++++++++++++++++ operator/src/lgen/mod.rs | 9 ++ operator/src/lgen/spec.rs | 39 +++++ operator/src/lib.rs | 7 + operator/src/main.rs | 5 +- operator/src/simulation/controller.rs | 2 +- 9 files changed, 447 insertions(+), 5 deletions(-) create mode 100644 operator/src/lgen/controller.rs create mode 100644 operator/src/lgen/job.rs create mode 100644 operator/src/lgen/mod.rs create mode 100644 operator/src/lgen/spec.rs diff --git a/Cargo.lock b/Cargo.lock index 66b61d59..d5b4eec6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2842,6 +2842,7 @@ dependencies = [ "clap", "did-method-key", "ed25519-dalek", + "futures", "goose", "hex", "ipld-core", diff --git a/operator/src/crdgen.rs b/operator/src/crdgen.rs index 961b640f..8e3c6e28 100644 --- a/operator/src/crdgen.rs +++ b/operator/src/crdgen.rs @@ -1,10 +1,12 @@ -use kube::CustomResourceExt; - +use keramik_operator::lgen::spec::LoadGenerator; use keramik_operator::network::Network; use keramik_operator::simulation::Simulation; +use kube::CustomResourceExt; fn main() { print!("{}", serde_yaml::to_string(&Network::crd()).unwrap()); println!("---"); print!("{}", serde_yaml::to_string(&Simulation::crd()).unwrap()); + println!("---"); + print!("{}", serde_yaml::to_string(&LoadGenerator::crd()).unwrap()); } diff --git a/operator/src/lgen/controller.rs b/operator/src/lgen/controller.rs new file mode 100644 index 00000000..bdd9a33b --- /dev/null +++ b/operator/src/lgen/controller.rs @@ -0,0 +1,200 @@ +use std::{sync::Arc, time::Duration}; + +use futures::stream::StreamExt; +use k8s_openapi::api::batch::v1::Job; +use kube::{ + api::{Patch, PatchParams}, + client::Client, + core::object::HasSpec, + runtime::Controller, + Api, +}; +use kube::{ + runtime::{ + controller::Action, + watcher::{self, Config}, + }, + Resource, ResourceExt, +}; +use opentelemetry::{global, KeyValue}; +use rand::{distributions::Alphanumeric, thread_rng, Rng, RngCore}; + +use tracing::{debug, error, info}; + +use crate::{ + labels::MANAGED_BY_LABEL_SELECTOR, + lgen::{ + job::{job_spec, JobConfig, JobImageConfig}, + spec::{LoadGenerator, LoadGeneratorStatus}, + }, + simulation::controller::monitoring_ready, + utils::Clock, +}; + +use crate::network::ipfs_rpc::{HttpRpcClient, IpfsRpcClient}; + +use crate::utils::{apply_job, Context}; + +/// The name of the load generator job. +pub const LOAD_GENERATOR_JOB_NAME: &str = "load-gen-job"; + +/// Handle errors during reconciliation. +fn on_error( + _network: Arc, + _error: &Error, + _context: Arc>, +) -> Action { + Action::requeue(Duration::from_secs(5)) +} + +/// Errors produced by the reconcile function. +#[derive(Debug, thiserror::Error)] +enum Error { + #[error("App error: {source}")] + App { + #[from] + source: anyhow::Error, + }, + #[error("Kube error: {source}")] + Kube { + #[from] + source: kube::Error, + }, +} + +/// Start a controller for the LoadGenerator CRD. +pub async fn run() { + let k_client = Client::try_default().await.unwrap(); + let context = Arc::new( + Context::new(k_client.clone(), HttpRpcClient).expect("should be able to create context"), + ); + + let load_generators: Api = Api::all(k_client.clone()); + let jobs = Api::::all(k_client.clone()); + + Controller::new(load_generators.clone(), Config::default()) + .owns( + jobs, + watcher::Config::default().labels(MANAGED_BY_LABEL_SELECTOR), + ) + .run(reconcile, on_error, context) + .for_each(|rec_res| async move { + match rec_res { + Ok((load_generator, _)) => { + info!(load_generator.name, "reconcile success"); + } + Err(err) => { + error!(?err, "reconcile error") + } + } + }) + .await; +} + +/// Perform a reconcile pass for the LoadGenerator CRD +async fn reconcile( + load_generator: Arc, + cx: Arc>, +) -> Result { + let meter = global::meter("keramik"); + let runs = meter + .u64_counter("load_generator_reconcile_count") + .with_description("Number of load generator reconciles") + .init(); + + match reconcile_(load_generator, cx).await { + Ok(action) => { + runs.add( + 1, + &[KeyValue { + key: "result".into(), + value: "ok".into(), + }], + ); + Ok(action) + } + Err(err) => { + runs.add( + 1, + &[KeyValue { + key: "result".into(), + value: "err".into(), + }], + ); + Err(err) + } + } +} + +/// Perform a reconcile pass for the LoadGenerator CRD +async fn reconcile_( + load_generator: Arc, + cx: Arc>, +) -> Result { + let spec = load_generator.spec(); + + let status = if let Some(status) = &load_generator.status { + status.clone() + } else { + // Generate new status with random name and nonce + LoadGeneratorStatus { + nonce: thread_rng().gen(), + name: "load-gen-" + .chars() + .chain( + thread_rng() + .sample_iter(&Alphanumeric) + .take(6) + .map(char::from), + ) + .collect::(), + } + }; + debug!(?spec, ?status, "reconcile"); + + let ns = load_generator.namespace().unwrap(); + + // The load generator does not deploy the monitoring resources but they must exist in order to + // collect the results of load generators. + let ready = monitoring_ready(cx.clone(), &ns).await?; + + if !ready { + return Ok(Action::requeue(Duration::from_secs(10))); + } + + let job_image_config = JobImageConfig::from(spec); + + let job_config = JobConfig { + name: status.name.clone(), + scenario: spec.scenario.to_owned(), + tasks: spec.tasks.to_owned(), + run_time: spec.run_time.to_owned(), + nonce: status.nonce, + job_image_config: job_image_config.clone(), + throttle_requests: spec.throttle_requests, + }; + let orefs = load_generator + .controller_owner_ref(&()) + .map(|oref| vec![oref]) + .unwrap_or_default(); + + apply_job( + cx.clone(), + &ns, + orefs.clone(), + LOAD_GENERATOR_JOB_NAME, + job_spec(job_config), + ) + .await?; + + let load_generators: Api = Api::namespaced(cx.k_client.clone(), &ns); + let _patched = load_generators + .patch_status( + &load_generator.name_any(), + &PatchParams::default(), + &Patch::Merge(serde_json::json!({ "status": status })), + ) + .await?; + + Ok(Action::requeue(Duration::from_secs(10))) +} diff --git a/operator/src/lgen/job.rs b/operator/src/lgen/job.rs new file mode 100644 index 00000000..8473a2ec --- /dev/null +++ b/operator/src/lgen/job.rs @@ -0,0 +1,183 @@ +use std::collections::BTreeMap; + +use crate::{lgen::spec::LoadGeneratorSpec, network::PEERS_CONFIG_MAP_NAME}; +use k8s_openapi::api::{ + batch::v1::JobSpec, + core::v1::{ + ConfigMapVolumeSource, Container, EnvVar, EnvVarSource, PodSpec, PodTemplateSpec, + SecretKeySelector, Volume, VolumeMount, + }, +}; +use kube::api::ObjectMeta; + +/// Configuration for job images. +#[derive(Clone, Debug)] +pub struct JobImageConfig { + /// Image for all jobs created by the load generator. + pub image: String, + /// Pull policy for image. + pub image_pull_policy: String, +} + +impl Default for JobImageConfig { + fn default() -> Self { + Self { + image: "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest".to_owned(), + image_pull_policy: "Always".to_owned(), + } + } +} + +impl From<&LoadGeneratorSpec> for JobImageConfig { + fn from(value: &LoadGeneratorSpec) -> Self { + let default = Self::default(); + Self { + image: value.image.to_owned().unwrap_or(default.image), + image_pull_policy: value + .image_pull_policy + .to_owned() + .unwrap_or(default.image_pull_policy), + } + } +} + +/// JobConfig defines which properties of the JobSpec can be customized. +pub struct JobConfig { + /// Name of the load generator job. + pub name: String, + /// Scenario to run. + pub scenario: String, + /// Number of tasks to run. + pub tasks: usize, + /// Run time in seconds. + pub run_time: u32, + /// Throttle requests rate. + pub throttle_requests: Option, + /// Nonce for the load generator. + pub nonce: u32, + /// Image configuration for the load generator job. + pub job_image_config: JobImageConfig, +} + +/// Create a job spec for the load generator. +pub fn job_spec(config: JobConfig) -> JobSpec { + let mut env_vars = vec![ + EnvVar { + name: "RUNNER_OTLP_ENDPOINT".to_owned(), + value: Some("http://otel:4317".to_owned()), + ..Default::default() + }, + EnvVar { + name: "RUST_LOG".to_owned(), + value: Some("info,keramik_runner=trace".to_owned()), + ..Default::default() + }, + EnvVar { + name: "GENERATOR_NAME".to_owned(), + value: Some(config.name.to_owned()), + ..Default::default() + }, + EnvVar { + name: "GENERATOR_SCENARIO".to_owned(), + value: Some(config.scenario.to_owned()), + ..Default::default() + }, + EnvVar { + name: "GENERATOR_TASKS".to_owned(), + value: Some(config.tasks.to_owned().to_string()), + ..Default::default() + }, + EnvVar { + name: "GENERATOR_RUN_TIME".to_owned(), + value: Some(config.run_time.to_owned().to_string()), + ..Default::default() + }, + EnvVar { + name: "GENERATOR_NONCE".to_owned(), + value: Some(config.nonce.to_owned().to_string()), + ..Default::default() + }, + EnvVar { + name: "GENERATOR_PEERS_PATH".to_owned(), + value: Some("/keramik-peers/peers.json".to_owned()), + ..Default::default() + }, + EnvVar { + name: "DID_KEY".to_owned(), + value: Some("did:key:z6Mkqn5jbycThHcBtakJZ8fHBQ2oVRQhXQEdQk5ZK2NDtNZA".to_owned()), + ..Default::default() + }, + EnvVar { + name: "DID_PRIVATE_KEY".to_owned(), + value: Some( + "86dce513cf0a37d4acd6d2c2e00fe4b95e0e655ca51e1a890808f5fa6f4fe65a".to_owned(), + ), + ..Default::default() + }, + EnvVar { + name: "CERAMIC_ADMIN_PRIVATE_KEY".to_owned(), + value_from: Some(EnvVarSource { + secret_key_ref: Some(SecretKeySelector { + key: "private-key".to_owned(), + name: Some("ceramic-admin".to_owned()), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }, + ]; + + if let Some(throttle_requests) = config.throttle_requests { + env_vars.push(EnvVar { + name: "GENERATOR_THROTTLE_REQUESTS".to_owned(), + value: Some(throttle_requests.to_string()), + ..Default::default() + }) + } + + JobSpec { + backoff_limit: Some(1), + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(BTreeMap::from_iter(vec![( + "name".to_owned(), + "load-gen-job".to_owned(), + )])), + ..Default::default() + }), + spec: Some(PodSpec { + hostname: Some("job".to_owned()), + subdomain: Some("load-gen-job".to_owned()), + containers: vec![Container { + name: "job".to_owned(), + image: Some(config.job_image_config.image), + image_pull_policy: Some(config.job_image_config.image_pull_policy), + command: Some(vec![ + "/usr/bin/keramik-runner".to_owned(), + "generate-load".to_owned(), + ]), + env: Some(env_vars), + volume_mounts: Some(vec![VolumeMount { + mount_path: "/keramik-peers".to_owned(), + name: "keramik-peers".to_owned(), + ..Default::default() + }]), + ..Default::default() + }], + volumes: Some(vec![Volume { + config_map: Some(ConfigMapVolumeSource { + default_mode: Some(0o755), + name: Some(PEERS_CONFIG_MAP_NAME.to_owned()), + ..Default::default() + }), + name: "keramik-peers".to_owned(), + ..Default::default() + }]), + restart_policy: Some("Never".to_owned()), + ..Default::default() + }), + }, + ..Default::default() + } +} diff --git a/operator/src/lgen/mod.rs b/operator/src/lgen/mod.rs new file mode 100644 index 00000000..ffaf6409 --- /dev/null +++ b/operator/src/lgen/mod.rs @@ -0,0 +1,9 @@ +/// Lgen controller arm module for reconciling load generator resources. +#[cfg(feature = "controller")] +pub mod controller; +/// Job module for creating load generator jobs. +#[cfg(feature = "controller")] +pub mod job; +/// Spec module for creating load generator specs. +#[cfg(feature = "controller")] +pub mod spec; diff --git a/operator/src/lgen/spec.rs b/operator/src/lgen/spec.rs new file mode 100644 index 00000000..a0049237 --- /dev/null +++ b/operator/src/lgen/spec.rs @@ -0,0 +1,39 @@ +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Primary CRD for creating and managing a Load Generator. +#[derive(CustomResource, Serialize, Deserialize, Debug, Default, PartialEq, Clone, JsonSchema)] +#[kube( + group = "keramik.3box.io", + version = "v1alpha1", + kind = "LoadGenerator", + plural = "loadgenerators", + status = "LoadGeneratorStatus", + derive = "PartialEq", + namespaced +)] +#[serde(rename_all = "camelCase")] +pub struct LoadGeneratorSpec { + /// Load generator scenario + pub scenario: String, + /// Time in minutes to run the load generator + pub run_time: u32, + /// Image for all jobs created by the load generator. + pub image: Option, + /// Pull policy for image. + pub image_pull_policy: Option, + /// Throttle requests (per second) for a load generator. Currently on a per-worker basis. + pub throttle_requests: Option, + /// Number of tasks to run in parallel + pub tasks: usize, +} + +/// Status of the load generator. +#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, JsonSchema)] +pub struct LoadGeneratorStatus { + /// Name of the load generator. + pub name: String, + /// Nonce for the load generator. + pub nonce: u32, +} diff --git a/operator/src/lib.rs b/operator/src/lib.rs index 2733be77..b3612da4 100644 --- a/operator/src/lib.rs +++ b/operator/src/lib.rs @@ -1,12 +1,19 @@ //! Provides API for the operator and related tooling. #![warn(missing_docs)] +/// Labels module for managing resource labels. #[cfg(feature = "controller")] pub(crate) mod labels; +/// Lgen module for generating load. +pub mod lgen; +/// Monitoring module for monitoring resources. #[cfg(feature = "controller")] pub mod monitoring; +/// Network module for managing network resources. pub mod network; +/// Simulation module for running simulations. pub mod simulation; +/// Utils module for shared utility functions. #[cfg(feature = "controller")] pub mod utils; diff --git a/operator/src/main.rs b/operator/src/main.rs index 36308612..a69efe2c 100644 --- a/operator/src/main.rs +++ b/operator/src/main.rs @@ -1,4 +1,4 @@ -//! Operator is a long lived process that auotmates creating and managing Ceramic networks. +//! Operator is a long lived process that automates creating and managing Ceramic networks. #![deny(missing_docs)] use anyhow::Result; use clap::{command, Parser, Subcommand}; @@ -38,7 +38,8 @@ async fn main() -> Result<()> { Command::Daemon => { tokio::join!( keramik_operator::network::run(), - keramik_operator::simulation::run() + keramik_operator::simulation::run(), + keramik_operator::lgen::controller::run(), ); } }; diff --git a/operator/src/simulation/controller.rs b/operator/src/simulation/controller.rs index 23587498..a402153e 100644 --- a/operator/src/simulation/controller.rs +++ b/operator/src/simulation/controller.rs @@ -289,7 +289,7 @@ async fn redis_ready( Ok(redis_ready) } -async fn monitoring_ready( +pub async fn monitoring_ready( cx: Arc>, ns: &str, ) -> Result { From 08d6c0369027235d516002337ad33628bd3fec50 Mon Sep 17 00:00:00 2001 From: Samika Kashyap Date: Wed, 17 Jul 2024 15:32:02 -0700 Subject: [PATCH 2/5] feat: add api access for load gen --- k8s/operator/manifests/operator.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/k8s/operator/manifests/operator.yaml b/k8s/operator/manifests/operator.yaml index 1fb41be8..1446244d 100644 --- a/k8s/operator/manifests/operator.yaml +++ b/k8s/operator/manifests/operator.yaml @@ -36,7 +36,7 @@ rules: resources: ["clusterroles", "clusterrolebindings"] verbs: ["create", "get", "patch"] - apiGroups: ["keramik.3box.io"] - resources: ["networks", "networks/status", "simulations", "simulations/status"] + resources: ["networks", "networks/status", "simulations", "simulations/status", "loadgenerators", "loadgenerators/status"] verbs: ["get", "list", "watch", "patch", "delete"] - apiGroups: ["monitoring.coreos.com"] resources: ["podmonitors"] @@ -87,7 +87,7 @@ apiVersion: apps/v1 kind: Deployment metadata: name: keramik-operator - namespace: default + namespace: keramik labels: app: keramik-operator app.kubernetes.io/name: keramik-operator From 8248082f204d3f331e65aef6d30ee6d7e93be7fd Mon Sep 17 00:00:00 2001 From: Samika Kashyap Date: Wed, 17 Jul 2024 15:32:46 -0700 Subject: [PATCH 3/5] fix: command name --- runner/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/src/main.rs b/runner/src/main.rs index 78395ec0..7f47723b 100644 --- a/runner/src/main.rs +++ b/runner/src/main.rs @@ -50,7 +50,7 @@ impl Command { Command::Simulate(_) => "simulate", Command::Noop => "noop", // TODO : After making operator changes this command will be used to generate load - Command::GenerateLoad(_) => "generate_load", + Command::GenerateLoad(_) => "generate-load", } } } From 0bdd9018a93666a81f1d6eaf1dbd842649b28358 Mon Sep 17 00:00:00 2001 From: Samika Kashyap Date: Fri, 26 Jul 2024 13:10:12 -0700 Subject: [PATCH 4/5] fix: naming + clenaups --- k8s/operator/manifests/operator.yaml | 2 +- operator/src/lgen/controller.rs | 4 ++-- operator/src/lgen/spec.rs | 4 ++-- runner/src/main.rs | 1 - 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/k8s/operator/manifests/operator.yaml b/k8s/operator/manifests/operator.yaml index 1446244d..02662b87 100644 --- a/k8s/operator/manifests/operator.yaml +++ b/k8s/operator/manifests/operator.yaml @@ -87,7 +87,7 @@ apiVersion: apps/v1 kind: Deployment metadata: name: keramik-operator - namespace: keramik + namespace: default labels: app: keramik-operator app.kubernetes.io/name: keramik-operator diff --git a/operator/src/lgen/controller.rs b/operator/src/lgen/controller.rs index bdd9a33b..8c1abb81 100644 --- a/operator/src/lgen/controller.rs +++ b/operator/src/lgen/controller.rs @@ -25,7 +25,7 @@ use crate::{ labels::MANAGED_BY_LABEL_SELECTOR, lgen::{ job::{job_spec, JobConfig, JobImageConfig}, - spec::{LoadGenerator, LoadGeneratorStatus}, + spec::{LoadGenerator, LoadGeneratorState}, }, simulation::controller::monitoring_ready, utils::Clock, @@ -137,7 +137,7 @@ async fn reconcile_( status.clone() } else { // Generate new status with random name and nonce - LoadGeneratorStatus { + LoadGeneratorState { nonce: thread_rng().gen(), name: "load-gen-" .chars() diff --git a/operator/src/lgen/spec.rs b/operator/src/lgen/spec.rs index a0049237..cc656d41 100644 --- a/operator/src/lgen/spec.rs +++ b/operator/src/lgen/spec.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; version = "v1alpha1", kind = "LoadGenerator", plural = "loadgenerators", - status = "LoadGeneratorStatus", + status = "LoadGeneratorState", derive = "PartialEq", namespaced )] @@ -31,7 +31,7 @@ pub struct LoadGeneratorSpec { /// Status of the load generator. #[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, JsonSchema)] -pub struct LoadGeneratorStatus { +pub struct LoadGeneratorState { /// Name of the load generator. pub name: String, /// Nonce for the load generator. diff --git a/runner/src/main.rs b/runner/src/main.rs index 7f47723b..30ba376b 100644 --- a/runner/src/main.rs +++ b/runner/src/main.rs @@ -49,7 +49,6 @@ impl Command { Command::Bootstrap(_) => "bootstrap", Command::Simulate(_) => "simulate", Command::Noop => "noop", - // TODO : After making operator changes this command will be used to generate load Command::GenerateLoad(_) => "generate-load", } } From ec53b9ce83c89424bf2c748256ea0fb1825bb69e Mon Sep 17 00:00:00 2001 From: Samika Kashyap Date: Tue, 30 Jul 2024 07:55:22 -0700 Subject: [PATCH 5/5] chore: review comments --- operator/src/lgen/job.rs | 5 +++-- operator/src/lgen/mod.rs | 3 +++ operator/src/main.rs | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/operator/src/lgen/job.rs b/operator/src/lgen/job.rs index 8473a2ec..40fab085 100644 --- a/operator/src/lgen/job.rs +++ b/operator/src/lgen/job.rs @@ -9,6 +9,7 @@ use k8s_openapi::api::{ }, }; use kube::api::ObjectMeta; +use crate::lgen::controller::LOAD_GENERATOR_JOB_NAME; /// Configuration for job images. #[derive(Clone, Debug)] @@ -142,13 +143,13 @@ pub fn job_spec(config: JobConfig) -> JobSpec { metadata: Some(ObjectMeta { labels: Some(BTreeMap::from_iter(vec![( "name".to_owned(), - "load-gen-job".to_owned(), + LOAD_GENERATOR_JOB_NAME.to_owned(), )])), ..Default::default() }), spec: Some(PodSpec { hostname: Some("job".to_owned()), - subdomain: Some("load-gen-job".to_owned()), + subdomain: Some(LOAD_GENERATOR_JOB_NAME.to_owned()), containers: vec![Container { name: "job".to_owned(), image: Some(config.job_image_config.image), diff --git a/operator/src/lgen/mod.rs b/operator/src/lgen/mod.rs index ffaf6409..793f1141 100644 --- a/operator/src/lgen/mod.rs +++ b/operator/src/lgen/mod.rs @@ -7,3 +7,6 @@ pub mod job; /// Spec module for creating load generator specs. #[cfg(feature = "controller")] pub mod spec; +/// Run module for running load generators. +#[cfg(feature = "controller")] +pub use controller::run; diff --git a/operator/src/main.rs b/operator/src/main.rs index a69efe2c..5dfc520d 100644 --- a/operator/src/main.rs +++ b/operator/src/main.rs @@ -39,7 +39,7 @@ async fn main() -> Result<()> { tokio::join!( keramik_operator::network::run(), keramik_operator::simulation::run(), - keramik_operator::lgen::controller::run(), + keramik_operator::lgen::run(), ); } };