From 3d0bd11306956e66118344ac6b2fe55098f1a89c Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 5 Sep 2024 11:22:04 +0530 Subject: [PATCH] create a trait for K8sClient --- api/Cargo.toml | 1 + api/src/k8s_client.rs | 68 ++++++++++++++++++++++++++++++++++--------- api/src/main.rs | 5 +++- api/src/worker.rs | 20 ++++++++----- 4 files changed, 73 insertions(+), 21 deletions(-) diff --git a/api/Cargo.toml b/api/Cargo.toml index a78645a..72ca15d 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -13,6 +13,7 @@ name = "api" [dependencies] actix-web = { workspace = true, features = ["macros", "http2"] } anyhow = { workspace = true, features = ["std"] } +async-trait = { workspace = true } bytes = { workspace = true } config = { workspace = true, features = ["yaml"] } k8s-openapi = { workspace = true, features = ["latest"] } diff --git a/api/src/k8s_client.rs b/api/src/k8s_client.rs index 96610bf..edb8487 100644 --- a/api/src/k8s_client.rs +++ b/api/src/k8s_client.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use k8s_openapi::api::{ apps::v1::StatefulSet, core::v1::{ConfigMap, Pod, Secret}, @@ -20,7 +21,45 @@ pub enum K8sError { Kube(#[from] kube::Error), } -pub struct K8sClient { +#[async_trait] +pub trait K8sClient { + async fn create_or_update_postgres_secret( + &self, + prefix: &str, + postgres_password: &str, + ) -> Result<(), K8sError>; + + async fn create_or_update_bq_secret( + &self, + prefix: &str, + bq_service_account_key: &str, + ) -> Result<(), K8sError>; + + async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError>; + + async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError>; + + async fn create_or_update_config_map( + &self, + prefix: &str, + base_config: &str, + prod_config: &str, + ) -> Result<(), K8sError>; + + async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError>; + + async fn create_or_update_stateful_set( + &self, + prefix: &str, + replicator_image: &str, + ) -> Result<(), K8sError>; + + async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>; + + async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError>; +} + +pub struct HttpK8sClient { secrets_api: Api, config_maps_api: Api, stateful_sets_api: Api, @@ -33,8 +72,8 @@ const CONFIG_MAP_NAME_SUFFIX: &str = "replicator-config"; const STATEFUL_SET_NAME_SUFFIX: &str = "replicator"; const CONTAINER_NAME_SUFFIX: &str = "replicator"; -impl K8sClient { - pub async fn new() -> Result { +impl HttpK8sClient { + pub async fn new() -> Result { let client = Client::try_default().await?; let secrets_api: Api = Api::default_namespaced(client.clone()); @@ -42,15 +81,18 @@ impl K8sClient { let stateful_sets_api: Api = Api::default_namespaced(client.clone()); let pods_api: Api = Api::default_namespaced(client); - Ok(K8sClient { + Ok(HttpK8sClient { secrets_api, config_maps_api, stateful_sets_api, pods_api, }) } +} - pub async fn create_or_update_postgres_secret( +#[async_trait] +impl K8sClient for HttpK8sClient { + async fn create_or_update_postgres_secret( &self, prefix: &str, postgres_password: &str, @@ -80,7 +122,7 @@ impl K8sClient { Ok(()) } - pub async fn create_or_update_bq_secret( + async fn create_or_update_bq_secret( &self, prefix: &str, bq_service_account_key: &str, @@ -110,7 +152,7 @@ impl K8sClient { Ok(()) } - pub async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError> { + async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError> { info!("deleting postgres secret"); let secret_name = format!("{prefix}-{POSTGRES_SECRET_NAME_SUFFIX}"); let dp = DeleteParams::default(); @@ -129,7 +171,7 @@ impl K8sClient { Ok(()) } - pub async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError> { + async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError> { info!("deleting bq secret"); let secret_name = format!("{prefix}-{BQ_SECRET_NAME_SUFFIX}"); let dp = DeleteParams::default(); @@ -148,7 +190,7 @@ impl K8sClient { Ok(()) } - pub async fn create_or_update_config_map( + async fn create_or_update_config_map( &self, prefix: &str, base_config: &str, @@ -178,7 +220,7 @@ impl K8sClient { Ok(()) } - pub async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError> { + async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError> { info!("deleting config map"); let config_map_name = format!("{prefix}-{CONFIG_MAP_NAME_SUFFIX}"); let dp = DeleteParams::default(); @@ -197,7 +239,7 @@ impl K8sClient { Ok(()) } - pub async fn create_or_update_stateful_set( + async fn create_or_update_stateful_set( &self, prefix: &str, replicator_image: &str, @@ -291,7 +333,7 @@ impl K8sClient { Ok(()) } - pub async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError> { + async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError> { info!("deleting stateful set"); let stateful_set_name = format!("{prefix}-{STATEFUL_SET_NAME_SUFFIX}"); let dp = DeleteParams::default(); @@ -311,7 +353,7 @@ impl K8sClient { Ok(()) } - pub async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError> { + async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError> { info!("deleting pod"); let pod_name = format!("{prefix}-{STATEFUL_SET_NAME_SUFFIX}-0"); let dp = DeleteParams::default(); diff --git a/api/src/main.rs b/api/src/main.rs index a026e4b..053de53 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -2,6 +2,7 @@ use std::fmt::{Debug, Display}; use api::{ configuration::get_configuration, + k8s_client::HttpK8sClient, startup::Application, telemetry::{get_subscriber, init_subscriber}, worker::run_worker_until_stopped, @@ -17,7 +18,9 @@ pub async fn main() -> anyhow::Result<()> { info!("{configuration}"); let application = Application::build(configuration.clone()).await?; let application_task = tokio::spawn(application.run_until_stopped()); - let worker_task = tokio::spawn(run_worker_until_stopped(configuration)); + + let k8s_client = HttpK8sClient::new().await?; + let worker_task = tokio::spawn(run_worker_until_stopped(configuration, k8s_client)); tokio::select! { o = application_task => report_exit("API", o), diff --git a/api/src/worker.rs b/api/src/worker.rs index 5c3e62c..003de5a 100644 --- a/api/src/worker.rs +++ b/api/src/worker.rs @@ -12,16 +12,22 @@ use crate::{ startup::get_connection_pool, }; -pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> { +pub async fn run_worker_until_stopped( + configuration: Settings, + k8s_client: C, +) -> Result<(), anyhow::Error> { let connection_pool = get_connection_pool(&configuration.database); let poll_duration = Duration::from_secs(configuration.worker.poll_interval_secs); - worker_loop(connection_pool, poll_duration).await + worker_loop(connection_pool, poll_duration, &k8s_client).await } -async fn worker_loop(pool: PgPool, poll_duration: Duration) -> Result<(), anyhow::Error> { - let k8s_client = K8sClient::new().await?; +async fn worker_loop( + pool: PgPool, + poll_duration: Duration, + k8s_client: &C, +) -> Result<(), anyhow::Error> { loop { - match try_execute_task(&pool, &k8s_client).await { + match try_execute_task(&pool, k8s_client).await { Ok(ExecutionOutcome::EmptyQueue) => { debug!("no task in queue"); tokio::time::sleep(poll_duration).await; @@ -79,9 +85,9 @@ fn create_prefix(tenant_id: i64, replicator_id: i64) -> String { format!("{tenant_id}-{replicator_id}") } -pub async fn try_execute_task( +pub async fn try_execute_task( pool: &PgPool, - k8s_client: &K8sClient, + k8s_client: &C, ) -> Result { let task = dequeue_task(pool).await?; let Some((transaction, task)) = task else {