Skip to content

Commit

Permalink
create a trait for K8sClient
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 5, 2024
1 parent dfd9bf1 commit 3d0bd11
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 21 deletions.
1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ name = "api"
[dependencies]
actix-web = { workspace = true, features = ["macros", "http2"] }
anyhow = { workspace = true, features = ["std"] }
async-trait = { workspace = true }
bytes = { workspace = true }
config = { workspace = true, features = ["yaml"] }
k8s-openapi = { workspace = true, features = ["latest"] }
Expand Down
68 changes: 55 additions & 13 deletions api/src/k8s_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use k8s_openapi::api::{
apps::v1::StatefulSet,
core::v1::{ConfigMap, Pod, Secret},
Expand All @@ -20,7 +21,45 @@ pub enum K8sError {
Kube(#[from] kube::Error),
}

pub struct K8sClient {
#[async_trait]
pub trait K8sClient {
async fn create_or_update_postgres_secret(
&self,
prefix: &str,
postgres_password: &str,
) -> Result<(), K8sError>;

async fn create_or_update_bq_secret(
&self,
prefix: &str,
bq_service_account_key: &str,
) -> Result<(), K8sError>;

async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError>;

async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError>;

async fn create_or_update_config_map(
&self,
prefix: &str,
base_config: &str,
prod_config: &str,
) -> Result<(), K8sError>;

async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError>;

async fn create_or_update_stateful_set(
&self,
prefix: &str,
replicator_image: &str,
) -> Result<(), K8sError>;

async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>;

async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError>;
}

pub struct HttpK8sClient {
secrets_api: Api<Secret>,
config_maps_api: Api<ConfigMap>,
stateful_sets_api: Api<StatefulSet>,
Expand All @@ -33,24 +72,27 @@ const CONFIG_MAP_NAME_SUFFIX: &str = "replicator-config";
const STATEFUL_SET_NAME_SUFFIX: &str = "replicator";
const CONTAINER_NAME_SUFFIX: &str = "replicator";

impl K8sClient {
pub async fn new() -> Result<K8sClient, K8sError> {
impl HttpK8sClient {
pub async fn new() -> Result<HttpK8sClient, K8sError> {
let client = Client::try_default().await?;

let secrets_api: Api<Secret> = Api::default_namespaced(client.clone());
let config_maps_api: Api<ConfigMap> = Api::default_namespaced(client.clone());
let stateful_sets_api: Api<StatefulSet> = Api::default_namespaced(client.clone());
let pods_api: Api<Pod> = Api::default_namespaced(client);

Ok(K8sClient {
Ok(HttpK8sClient {
secrets_api,
config_maps_api,
stateful_sets_api,
pods_api,
})
}
}

pub async fn create_or_update_postgres_secret(
#[async_trait]
impl K8sClient for HttpK8sClient {
async fn create_or_update_postgres_secret(
&self,
prefix: &str,
postgres_password: &str,
Expand Down Expand Up @@ -80,7 +122,7 @@ impl K8sClient {
Ok(())
}

pub async fn create_or_update_bq_secret(
async fn create_or_update_bq_secret(
&self,
prefix: &str,
bq_service_account_key: &str,
Expand Down Expand Up @@ -110,7 +152,7 @@ impl K8sClient {
Ok(())
}

pub async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError> {
async fn delete_postgres_secret(&self, prefix: &str) -> Result<(), K8sError> {
info!("deleting postgres secret");
let secret_name = format!("{prefix}-{POSTGRES_SECRET_NAME_SUFFIX}");
let dp = DeleteParams::default();
Expand All @@ -129,7 +171,7 @@ impl K8sClient {
Ok(())
}

pub async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError> {
async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError> {
info!("deleting bq secret");
let secret_name = format!("{prefix}-{BQ_SECRET_NAME_SUFFIX}");
let dp = DeleteParams::default();
Expand All @@ -148,7 +190,7 @@ impl K8sClient {
Ok(())
}

pub async fn create_or_update_config_map(
async fn create_or_update_config_map(
&self,
prefix: &str,
base_config: &str,
Expand Down Expand Up @@ -178,7 +220,7 @@ impl K8sClient {
Ok(())
}

pub async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError> {
async fn delete_config_map(&self, prefix: &str) -> Result<(), K8sError> {
info!("deleting config map");
let config_map_name = format!("{prefix}-{CONFIG_MAP_NAME_SUFFIX}");
let dp = DeleteParams::default();
Expand All @@ -197,7 +239,7 @@ impl K8sClient {
Ok(())
}

pub async fn create_or_update_stateful_set(
async fn create_or_update_stateful_set(
&self,
prefix: &str,
replicator_image: &str,
Expand Down Expand Up @@ -291,7 +333,7 @@ impl K8sClient {
Ok(())
}

pub async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError> {
async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError> {
info!("deleting stateful set");
let stateful_set_name = format!("{prefix}-{STATEFUL_SET_NAME_SUFFIX}");
let dp = DeleteParams::default();
Expand All @@ -311,7 +353,7 @@ impl K8sClient {
Ok(())
}

pub async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError> {
async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError> {
info!("deleting pod");
let pod_name = format!("{prefix}-{STATEFUL_SET_NAME_SUFFIX}-0");
let dp = DeleteParams::default();
Expand Down
5 changes: 4 additions & 1 deletion api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt::{Debug, Display};

use api::{
configuration::get_configuration,
k8s_client::HttpK8sClient,
startup::Application,
telemetry::{get_subscriber, init_subscriber},
worker::run_worker_until_stopped,
Expand All @@ -17,7 +18,9 @@ pub async fn main() -> anyhow::Result<()> {
info!("{configuration}");
let application = Application::build(configuration.clone()).await?;
let application_task = tokio::spawn(application.run_until_stopped());
let worker_task = tokio::spawn(run_worker_until_stopped(configuration));

let k8s_client = HttpK8sClient::new().await?;
let worker_task = tokio::spawn(run_worker_until_stopped(configuration, k8s_client));

tokio::select! {
o = application_task => report_exit("API", o),
Expand Down
20 changes: 13 additions & 7 deletions api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@ use crate::{
startup::get_connection_pool,
};

pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
pub async fn run_worker_until_stopped<C: K8sClient>(
configuration: Settings,
k8s_client: C,
) -> Result<(), anyhow::Error> {
let connection_pool = get_connection_pool(&configuration.database);
let poll_duration = Duration::from_secs(configuration.worker.poll_interval_secs);
worker_loop(connection_pool, poll_duration).await
worker_loop(connection_pool, poll_duration, &k8s_client).await
}

async fn worker_loop(pool: PgPool, poll_duration: Duration) -> Result<(), anyhow::Error> {
let k8s_client = K8sClient::new().await?;
async fn worker_loop<C: K8sClient>(
pool: PgPool,
poll_duration: Duration,
k8s_client: &C,
) -> Result<(), anyhow::Error> {
loop {
match try_execute_task(&pool, &k8s_client).await {
match try_execute_task(&pool, k8s_client).await {
Ok(ExecutionOutcome::EmptyQueue) => {
debug!("no task in queue");
tokio::time::sleep(poll_duration).await;
Expand Down Expand Up @@ -79,9 +85,9 @@ fn create_prefix(tenant_id: i64, replicator_id: i64) -> String {
format!("{tenant_id}-{replicator_id}")
}

pub async fn try_execute_task(
pub async fn try_execute_task<C: K8sClient>(
pool: &PgPool,
k8s_client: &K8sClient,
k8s_client: &C,
) -> Result<ExecutionOutcome, anyhow::Error> {
let task = dequeue_task(pool).await?;
let Some((transaction, task)) = task else {
Expand Down

0 comments on commit 3d0bd11

Please sign in to comment.