From c4a09ba5f7b89b1d856fdfbb0fe8ddd5f760f78f Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Tue, 3 Sep 2024 17:04:37 +0530 Subject: [PATCH] allow configuring of replicator images --- .../20240828090309_create_images.sql | 6 ++ ... => 20240902113739_create_replicators.sql} | 1 + ...ql => 20240903105045_create_pipelines.sql} | 0 api/src/db/images.rs | 79 +++++++++++++++++++ api/src/db/mod.rs | 1 + api/src/db/pipelines.rs | 3 +- api/src/db/replicators.rs | 18 +++-- api/src/routes/pipelines.rs | 34 ++++++-- 8 files changed, 129 insertions(+), 13 deletions(-) create mode 100644 api/migrations/20240828090309_create_images.sql rename api/migrations/{20240828090309_create_replicators.sql => 20240902113739_create_replicators.sql} (82%) rename api/migrations/{20240902113739_create_pipelines.sql => 20240903105045_create_pipelines.sql} (100%) create mode 100644 api/src/db/images.rs diff --git a/api/migrations/20240828090309_create_images.sql b/api/migrations/20240828090309_create_images.sql new file mode 100644 index 0000000..f8135d7 --- /dev/null +++ b/api/migrations/20240828090309_create_images.sql @@ -0,0 +1,6 @@ +create table + public.images ( + id bigint generated always as identity primary key, + image_name text not null, + is_default boolean not null + ); diff --git a/api/migrations/20240828090309_create_replicators.sql b/api/migrations/20240902113739_create_replicators.sql similarity index 82% rename from api/migrations/20240828090309_create_replicators.sql rename to api/migrations/20240902113739_create_replicators.sql index fac0f96..2f9723e 100644 --- a/api/migrations/20240828090309_create_replicators.sql +++ b/api/migrations/20240902113739_create_replicators.sql @@ -4,5 +4,6 @@ create table public.replicators ( id bigint generated always as identity primary key, tenant_id bigint references public.tenants(id) not null, + image_id bigint references public.images(id) not null, status replicator_status not null ); diff --git a/api/migrations/20240902113739_create_pipelines.sql b/api/migrations/20240903105045_create_pipelines.sql similarity index 100% rename from api/migrations/20240902113739_create_pipelines.sql rename to api/migrations/20240903105045_create_pipelines.sql diff --git a/api/src/db/images.rs b/api/src/db/images.rs new file mode 100644 index 0000000..abc70da --- /dev/null +++ b/api/src/db/images.rs @@ -0,0 +1,79 @@ +use sqlx::{PgPool, Postgres, Transaction}; + +pub struct Image { + pub id: i64, + pub image_name: String, + pub is_default: bool, +} + +pub async fn create_image( + pool: &PgPool, + image_name: &str, + is_default: bool, +) -> Result { + let mut txn = pool.begin().await?; + let res = create_image_txn(&mut txn, image_name, is_default).await; + txn.commit().await?; + res +} + +pub async fn create_image_txn( + txn: &mut Transaction<'_, Postgres>, + image_name: &str, + is_default: bool, +) -> Result { + let record = sqlx::query!( + r#" + insert into images (image_name, is_default) + values ($1, $2) + returning id + "#, + image_name, + is_default + ) + .fetch_one(&mut **txn) + .await?; + + Ok(record.id) +} + +pub async fn read_default_image(pool: &PgPool) -> Result, sqlx::Error> { + let record = sqlx::query!( + r#" + select id, image_name, is_default + from images + where is_default = true + "#, + ) + .fetch_optional(pool) + .await?; + + Ok(record.map(|r| Image { + id: r.id, + image_name: r.image_name, + is_default: r.is_default, + })) +} + +pub async fn read_image_by_replicator_id( + pool: &PgPool, + replicator_id: i64, +) -> Result, sqlx::Error> { + let record = sqlx::query!( + r#" + select i.id, i.image_name, i.is_default + from images i + join replicators r on i.id = r.image_id + where r.id = $1 + "#, + replicator_id, + ) + .fetch_optional(pool) + .await?; + + Ok(record.map(|r| Image { + id: r.id, + image_name: r.image_name, + is_default: r.is_default, + })) +} diff --git a/api/src/db/mod.rs b/api/src/db/mod.rs index 9167981..1d7ed00 100644 --- a/api/src/db/mod.rs +++ b/api/src/db/mod.rs @@ -5,3 +5,4 @@ pub mod sinks; pub mod sources; pub mod tables; pub mod tenants; +pub mod images; diff --git a/api/src/db/pipelines.rs b/api/src/db/pipelines.rs index 164af1b..811591f 100644 --- a/api/src/db/pipelines.rs +++ b/api/src/db/pipelines.rs @@ -31,12 +31,13 @@ pub async fn create_pipeline( tenant_id: i64, source_id: i64, sink_id: i64, + image_id: i64, publication_name: String, config: &PipelineConfig, ) -> Result { let config = serde_json::to_value(config).expect("failed to serialize config"); let mut txn = pool.begin().await?; - let replicator_id = create_replicator_txn(&mut txn, tenant_id).await?; + let replicator_id = create_replicator_txn(&mut txn, tenant_id, image_id).await?; let record = sqlx::query!( r#" insert into pipelines (tenant_id, source_id, sink_id, replicator_id, publication_name, config) diff --git a/api/src/db/replicators.rs b/api/src/db/replicators.rs index 782558b..66a4a0f 100644 --- a/api/src/db/replicators.rs +++ b/api/src/db/replicators.rs @@ -12,12 +12,17 @@ pub enum ReplicatorStatus { pub struct Replicator { pub id: i64, pub tenant_id: i64, + pub image_id: i64, pub status: ReplicatorStatus, } -pub async fn create_replicator(pool: &PgPool, tenant_id: i64) -> Result { +pub async fn create_replicator( + pool: &PgPool, + tenant_id: i64, + image_id: i64, +) -> Result { let mut txn = pool.begin().await?; - let res = create_replicator_txn(&mut txn, tenant_id).await; + let res = create_replicator_txn(&mut txn, tenant_id, image_id).await; txn.commit().await?; res } @@ -25,14 +30,16 @@ pub async fn create_replicator(pool: &PgPool, tenant_id: i64) -> Result, tenant_id: i64, + image_id: i64, ) -> Result { let record = sqlx::query!( r#" - insert into replicators (tenant_id, status) - values ($1, $2::replicator_status) + insert into replicators (tenant_id, image_id, status) + values ($1, $2, $3::replicator_status) returning id "#, tenant_id, + image_id, ReplicatorStatus::Stopped as ReplicatorStatus ) .fetch_one(&mut **txn) @@ -48,7 +55,7 @@ pub async fn read_replicator_by_pipeline_id( ) -> Result, sqlx::Error> { let record = sqlx::query!( r#" - select r.id, r.tenant_id, status as "status: ReplicatorStatus" + select r.id, r.tenant_id, r.image_id, r.status as "status: ReplicatorStatus" from replicators r join pipelines p on r.id = p.replicator_id where r.tenant_id = $1 and p.tenant_id = $1 and p.id = $2 @@ -62,6 +69,7 @@ pub async fn read_replicator_by_pipeline_id( Ok(record.map(|r| Replicator { id: r.id, tenant_id: r.tenant_id, + image_id: r.image_id, status: r.status, })) } diff --git a/api/src/routes/pipelines.rs b/api/src/routes/pipelines.rs index 7939615..727d8fe 100644 --- a/api/src/routes/pipelines.rs +++ b/api/src/routes/pipelines.rs @@ -12,6 +12,7 @@ use thiserror::Error; use crate::{ db::{ self, + images::Image, pipelines::{Pipeline, PipelineConfig}, replicators::Replicator, sinks::{sink_exists, Sink, SinkConfig}, @@ -41,6 +42,12 @@ enum PipelineError { #[error("replicator with pipeline id {0} not found")] ReplicatorNotFound(i64), + #[error("image with replicator id {0} not found")] + ImageNotFound(i64), + + #[error("no default image found")] + NoDefaultImageFound, + #[error("tenant id missing in request")] TenantIdMissing, @@ -67,7 +74,9 @@ impl ResponseError for PipelineError { match self { PipelineError::DatabaseError(_) | PipelineError::InvalidConfig(_) - | PipelineError::ReplicatorNotFound(_) => StatusCode::INTERNAL_SERVER_ERROR, + | PipelineError::ReplicatorNotFound(_) + | PipelineError::ImageNotFound(_) + | PipelineError::NoDefaultImageFound => StatusCode::INTERNAL_SERVER_ERROR, PipelineError::PipelineNotFound(_) => StatusCode::NOT_FOUND, PipelineError::TenantIdMissing | PipelineError::TenantIdIllFormed @@ -145,11 +154,16 @@ pub async fn create_pipeline( return Err(PipelineError::SinkNotFound(pipeline.sink_id)); } + let image = db::images::read_default_image(&pool) + .await? + .ok_or(PipelineError::NoDefaultImageFound)?; + let id = db::pipelines::create_pipeline( &pool, tenant_id, pipeline.source_id, pipeline.sink_id, + image.id, pipeline.publication_name, &config, ) @@ -272,13 +286,15 @@ pub async fn start_pipeline( let tenant_id = extract_tenant_id(&req)?; let pipeline_id = pipeline_id.into_inner(); - let (pipeline, replicator, source, sink) = read_data(&pool, tenant_id, pipeline_id).await?; + let (pipeline, replicator, image, source, sink) = + read_data(&pool, tenant_id, pipeline_id).await?; let (secrets, config) = create_configs(source.config, sink.config, pipeline)?; enqueue_create_or_update_secrets_task(&pool, tenant_id, replicator.id, secrets).await?; enqueue_create_or_update_config_task(&pool, tenant_id, replicator.id, config).await?; - enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id).await?; + enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id, image.image_name) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -307,13 +323,16 @@ async fn read_data( pool: &PgPool, tenant_id: i64, pipeline_id: i64, -) -> Result<(Pipeline, Replicator, Source, Sink), PipelineError> { +) -> Result<(Pipeline, Replicator, Image, Source, Sink), PipelineError> { let pipeline = db::pipelines::read_pipeline(pool, tenant_id, pipeline_id) .await? .ok_or(PipelineError::PipelineNotFound(pipeline_id))?; let replicator = db::replicators::read_replicator_by_pipeline_id(pool, tenant_id, pipeline_id) .await? .ok_or(PipelineError::ReplicatorNotFound(pipeline_id))?; + let image = db::images::read_image_by_replicator_id(pool, replicator.id) + .await? + .ok_or(PipelineError::ImageNotFound(replicator.id))?; let source_id = pipeline.source_id; let source = db::sources::read_source(pool, tenant_id, source_id) .await? @@ -322,7 +341,8 @@ async fn read_data( let sink = db::sinks::read_sink(pool, tenant_id, sink_id) .await? .ok_or(PipelineError::SinkNotFound(sink_id))?; - Ok((pipeline, replicator, source, sink)) + + Ok((pipeline, replicator, image, source, sink)) } fn create_configs( @@ -422,12 +442,12 @@ async fn enqueue_create_or_update_replicator_task( pool: &PgPool, tenant_id: i64, replicator_id: i64, + replicator_image: String, ) -> Result<(), PipelineError> { let req = Request::CreateOrUpdateReplicator { tenant_id, replicator_id, - replicator_image: "ramsup/replicator:main.96457e346eed76c0dce648dbfd5a6f645220ea9a" - .to_string(), //TODO: remove hardcode image + replicator_image, }; let task_data = serde_json::to_value(req)?;