From 8118ec368de5b30d7700171eab383074b4e269ab Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Mon, 2 Sep 2024 20:01:10 +0530 Subject: [PATCH] add api to start a pipeline --- ...31fff08af722f932f5ccd4a9083b99bcaf8d8.json | 47 +++++++ ...d5575357b71a8008344c3e6ae86b8e622bd5.json} | 5 +- ...1b944268727c84ae8a02cfc7c15fdf49a5d3.json} | 13 +- ...1020bd26a5f21a9a27b6eb580ea730a0e3dd.json} | 13 +- ...0de91c8ad174fa2bb7b0c217e843aa6f1cde2.json | 35 +++++ .../20240828090309_create_replicators.sql | 8 ++ ...ql => 20240902113739_create_pipelines.sql} | 1 + api/src/db/mod.rs | 1 + api/src/db/pipelines.rs | 19 ++- api/src/db/replicators.rs | 67 +++++++++ api/src/queue.rs | 2 +- api/src/routes/pipelines.rs | 129 +++++++++++++++++- api/src/worker.rs | 83 +++++++---- api/tests/api/pipelines.rs | 1 + api/tests/api/test_app.rs | 1 + 15 files changed, 377 insertions(+), 48 deletions(-) create mode 100644 api/.sqlx/query-4f38f0fd1e56661000bde2c712331fff08af722f932f5ccd4a9083b99bcaf8d8.json rename api/.sqlx/{query-f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe.json => query-5fd17de4de3f18aaa439dce48a14d5575357b71a8008344c3e6ae86b8e622bd5.json} (64%) rename api/.sqlx/{query-46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d.json => query-6936d9fd44bca05eb6c09b4ce9141b944268727c84ae8a02cfc7c15fdf49a5d3.json} (74%) rename api/.sqlx/{query-9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2.json => query-78d67ff45ac9a7bfa3f1754e90671020bd26a5f21a9a27b6eb580ea730a0e3dd.json} (73%) create mode 100644 api/.sqlx/query-c3a6dc4b4f1e478d6b9a142328b0de91c8ad174fa2bb7b0c217e843aa6f1cde2.json create mode 100644 api/migrations/20240828090309_create_replicators.sql rename api/migrations/{20240828090309_create_pipelines.sql => 20240902113739_create_pipelines.sql} (83%) create mode 100644 api/src/db/replicators.rs diff --git a/api/.sqlx/query-4f38f0fd1e56661000bde2c712331fff08af722f932f5ccd4a9083b99bcaf8d8.json b/api/.sqlx/query-4f38f0fd1e56661000bde2c712331fff08af722f932f5ccd4a9083b99bcaf8d8.json new file mode 100644 index 0000000..e89884d --- /dev/null +++ b/api/.sqlx/query-4f38f0fd1e56661000bde2c712331fff08af722f932f5ccd4a9083b99bcaf8d8.json @@ -0,0 +1,47 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select r.id, r.tenant_id, status as \"status: ReplicatorStatus\"\n from replicators r\n join pipelines p on r.id = p.replicator_id\n where r.tenant_id = $1 and p.tenant_id = $1 and p.id = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "tenant_id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "status: ReplicatorStatus", + "type_info": { + "Custom": { + "name": "replicator_status", + "kind": { + "Enum": [ + "stopped", + "starting", + "started", + "stopping" + ] + } + } + } + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "4f38f0fd1e56661000bde2c712331fff08af722f932f5ccd4a9083b99bcaf8d8" +} diff --git a/api/.sqlx/query-f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe.json b/api/.sqlx/query-5fd17de4de3f18aaa439dce48a14d5575357b71a8008344c3e6ae86b8e622bd5.json similarity index 64% rename from api/.sqlx/query-f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe.json rename to api/.sqlx/query-5fd17de4de3f18aaa439dce48a14d5575357b71a8008344c3e6ae86b8e622bd5.json index 8e8ed1c..9705303 100644 --- a/api/.sqlx/query-f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe.json +++ b/api/.sqlx/query-5fd17de4de3f18aaa439dce48a14d5575357b71a8008344c3e6ae86b8e622bd5.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n insert into pipelines (tenant_id, source_id, sink_id, publication_name, config)\n values ($1, $2, $3, $4, $5)\n returning id\n ", + "query": "\n insert into pipelines (tenant_id, source_id, sink_id, replicator_id, publication_name, config)\n values ($1, $2, $3, $4, $5, $6)\n returning id\n ", "describe": { "columns": [ { @@ -14,6 +14,7 @@ "Int8", "Int8", "Int8", + "Int8", "Text", "Jsonb" ] @@ -22,5 +23,5 @@ false ] }, - "hash": "f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe" + "hash": "5fd17de4de3f18aaa439dce48a14d5575357b71a8008344c3e6ae86b8e622bd5" } diff --git a/api/.sqlx/query-46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d.json b/api/.sqlx/query-6936d9fd44bca05eb6c09b4ce9141b944268727c84ae8a02cfc7c15fdf49a5d3.json similarity index 74% rename from api/.sqlx/query-46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d.json rename to api/.sqlx/query-6936d9fd44bca05eb6c09b4ce9141b944268727c84ae8a02cfc7c15fdf49a5d3.json index 3c0c7ba..8f1dcd5 100644 --- a/api/.sqlx/query-46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d.json +++ b/api/.sqlx/query-6936d9fd44bca05eb6c09b4ce9141b944268727c84ae8a02cfc7c15fdf49a5d3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select id, tenant_id, source_id, sink_id, publication_name, config\n from pipelines\n where tenant_id = $1 and id = $2\n ", + "query": "\n select id, tenant_id, source_id, sink_id, replicator_id, publication_name, config\n from pipelines\n where tenant_id = $1\n ", "describe": { "columns": [ { @@ -25,18 +25,22 @@ }, { "ordinal": 4, + "name": "replicator_id", + "type_info": "Int8" + }, + { + "ordinal": 5, "name": "publication_name", "type_info": "Text" }, { - "ordinal": 5, + "ordinal": 6, "name": "config", "type_info": "Jsonb" } ], "parameters": { "Left": [ - "Int8", "Int8" ] }, @@ -46,8 +50,9 @@ false, false, false, + false, false ] }, - "hash": "46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d" + "hash": "6936d9fd44bca05eb6c09b4ce9141b944268727c84ae8a02cfc7c15fdf49a5d3" } diff --git a/api/.sqlx/query-9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2.json b/api/.sqlx/query-78d67ff45ac9a7bfa3f1754e90671020bd26a5f21a9a27b6eb580ea730a0e3dd.json similarity index 73% rename from api/.sqlx/query-9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2.json rename to api/.sqlx/query-78d67ff45ac9a7bfa3f1754e90671020bd26a5f21a9a27b6eb580ea730a0e3dd.json index 437f76b..80af8b2 100644 --- a/api/.sqlx/query-9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2.json +++ b/api/.sqlx/query-78d67ff45ac9a7bfa3f1754e90671020bd26a5f21a9a27b6eb580ea730a0e3dd.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select id, tenant_id, source_id, sink_id, publication_name, config\n from pipelines\n where tenant_id = $1\n ", + "query": "\n select id, tenant_id, source_id, sink_id, replicator_id, publication_name, config\n from pipelines\n where tenant_id = $1 and id = $2\n ", "describe": { "columns": [ { @@ -25,17 +25,23 @@ }, { "ordinal": 4, + "name": "replicator_id", + "type_info": "Int8" + }, + { + "ordinal": 5, "name": "publication_name", "type_info": "Text" }, { - "ordinal": 5, + "ordinal": 6, "name": "config", "type_info": "Jsonb" } ], "parameters": { "Left": [ + "Int8", "Int8" ] }, @@ -45,8 +51,9 @@ false, false, false, + false, false ] }, - "hash": "9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2" + "hash": "78d67ff45ac9a7bfa3f1754e90671020bd26a5f21a9a27b6eb580ea730a0e3dd" } diff --git a/api/.sqlx/query-c3a6dc4b4f1e478d6b9a142328b0de91c8ad174fa2bb7b0c217e843aa6f1cde2.json b/api/.sqlx/query-c3a6dc4b4f1e478d6b9a142328b0de91c8ad174fa2bb7b0c217e843aa6f1cde2.json new file mode 100644 index 0000000..2a77f20 --- /dev/null +++ b/api/.sqlx/query-c3a6dc4b4f1e478d6b9a142328b0de91c8ad174fa2bb7b0c217e843aa6f1cde2.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "\n insert into replicators (tenant_id, status)\n values ($1, $2::replicator_status)\n returning id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + { + "Custom": { + "name": "replicator_status", + "kind": { + "Enum": [ + "stopped", + "starting", + "started", + "stopping" + ] + } + } + } + ] + }, + "nullable": [ + false + ] + }, + "hash": "c3a6dc4b4f1e478d6b9a142328b0de91c8ad174fa2bb7b0c217e843aa6f1cde2" +} diff --git a/api/migrations/20240828090309_create_replicators.sql b/api/migrations/20240828090309_create_replicators.sql new file mode 100644 index 0000000..fac0f96 --- /dev/null +++ b/api/migrations/20240828090309_create_replicators.sql @@ -0,0 +1,8 @@ +create type replicator_status as enum ('stopped', 'starting', 'started', 'stopping'); + +create table + public.replicators ( + id bigint generated always as identity primary key, + tenant_id bigint references public.tenants(id) not null, + status replicator_status not null + ); diff --git a/api/migrations/20240828090309_create_pipelines.sql b/api/migrations/20240902113739_create_pipelines.sql similarity index 83% rename from api/migrations/20240828090309_create_pipelines.sql rename to api/migrations/20240902113739_create_pipelines.sql index fea545e..d0021cc 100644 --- a/api/migrations/20240828090309_create_pipelines.sql +++ b/api/migrations/20240902113739_create_pipelines.sql @@ -4,6 +4,7 @@ create table tenant_id bigint references public.tenants(id) not null, source_id bigint references public.sources(id) not null, sink_id bigint references public.sinks(id) not null, + replicator_id bigint references public.replicators(id) not null, publication_name text not null, config jsonb not null ); diff --git a/api/src/db/mod.rs b/api/src/db/mod.rs index 0d7203c..9167981 100644 --- a/api/src/db/mod.rs +++ b/api/src/db/mod.rs @@ -1,5 +1,6 @@ pub mod pipelines; pub mod publications; +pub mod replicators; pub mod sinks; pub mod sources; pub mod tables; diff --git a/api/src/db/pipelines.rs b/api/src/db/pipelines.rs index d3dfef4..164af1b 100644 --- a/api/src/db/pipelines.rs +++ b/api/src/db/pipelines.rs @@ -1,5 +1,7 @@ use sqlx::PgPool; +use super::replicators::create_replicator_txn; + #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct PipelineConfig { pub config: BatchConfig, @@ -19,6 +21,7 @@ pub struct Pipeline { pub tenant_id: i64, pub source_id: i64, pub sink_id: i64, + pub replicator_id: i64, pub publication_name: String, pub config: serde_json::Value, } @@ -32,20 +35,24 @@ pub async fn create_pipeline( config: &PipelineConfig, ) -> Result { let config = serde_json::to_value(config).expect("failed to serialize config"); + let mut txn = pool.begin().await?; + let replicator_id = create_replicator_txn(&mut txn, tenant_id).await?; let record = sqlx::query!( r#" - insert into pipelines (tenant_id, source_id, sink_id, publication_name, config) - values ($1, $2, $3, $4, $5) + insert into pipelines (tenant_id, source_id, sink_id, replicator_id, publication_name, config) + values ($1, $2, $3, $4, $5, $6) returning id "#, tenant_id, source_id, sink_id, + replicator_id, publication_name, config ) - .fetch_one(pool) + .fetch_one(&mut *txn) .await?; + txn.commit().await?; Ok(record.id) } @@ -57,7 +64,7 @@ pub async fn read_pipeline( ) -> Result, sqlx::Error> { let record = sqlx::query!( r#" - select id, tenant_id, source_id, sink_id, publication_name, config + select id, tenant_id, source_id, sink_id, replicator_id, publication_name, config from pipelines where tenant_id = $1 and id = $2 "#, @@ -72,6 +79,7 @@ pub async fn read_pipeline( tenant_id: r.tenant_id, source_id: r.source_id, sink_id: r.sink_id, + replicator_id: r.replicator_id, publication_name: r.publication_name, config: r.config, })) @@ -133,7 +141,7 @@ pub async fn read_all_pipelines( ) -> Result, sqlx::Error> { let mut record = sqlx::query!( r#" - select id, tenant_id, source_id, sink_id, publication_name, config + select id, tenant_id, source_id, sink_id, replicator_id, publication_name, config from pipelines where tenant_id = $1 "#, @@ -149,6 +157,7 @@ pub async fn read_all_pipelines( tenant_id: r.tenant_id, source_id: r.source_id, sink_id: r.sink_id, + replicator_id: r.replicator_id, publication_name: r.publication_name, config: r.config, }) diff --git a/api/src/db/replicators.rs b/api/src/db/replicators.rs new file mode 100644 index 0000000..782558b --- /dev/null +++ b/api/src/db/replicators.rs @@ -0,0 +1,67 @@ +use sqlx::{PgPool, Postgres, Transaction}; + +#[derive(Clone, Debug, PartialEq, PartialOrd, sqlx::Type)] +#[sqlx(type_name = "replicator_status", rename_all = "lowercase")] +pub enum ReplicatorStatus { + Stopped, + Starting, + Started, + Stopping, +} + +pub struct Replicator { + pub id: i64, + pub tenant_id: i64, + pub status: ReplicatorStatus, +} + +pub async fn create_replicator(pool: &PgPool, tenant_id: i64) -> Result { + let mut txn = pool.begin().await?; + let res = create_replicator_txn(&mut txn, tenant_id).await; + txn.commit().await?; + res +} + +pub async fn create_replicator_txn( + txn: &mut Transaction<'_, Postgres>, + tenant_id: i64, +) -> Result { + let record = sqlx::query!( + r#" + insert into replicators (tenant_id, status) + values ($1, $2::replicator_status) + returning id + "#, + tenant_id, + ReplicatorStatus::Stopped as ReplicatorStatus + ) + .fetch_one(&mut **txn) + .await?; + + Ok(record.id) +} + +pub async fn read_replicator_by_pipeline_id( + pool: &PgPool, + tenant_id: i64, + pipeline_id: i64, +) -> Result, sqlx::Error> { + let record = sqlx::query!( + r#" + select r.id, r.tenant_id, status as "status: ReplicatorStatus" + from replicators r + join pipelines p on r.id = p.replicator_id + where r.tenant_id = $1 and p.tenant_id = $1 and p.id = $2 + "#, + tenant_id, + pipeline_id, + ) + .fetch_optional(pool) + .await?; + + Ok(record.map(|r| Replicator { + id: r.id, + tenant_id: r.tenant_id, + status: r.status, + })) +} diff --git a/api/src/queue.rs b/api/src/queue.rs index 0cad1a7..b29a9e9 100644 --- a/api/src/queue.rs +++ b/api/src/queue.rs @@ -4,7 +4,7 @@ pub async fn enqueue_task( pool: &PgPool, task_name: &str, task_data: serde_json::Value, -) -> Result { +) -> Result { let task = sqlx::query!( r#" insert into queue.task_queue (name, data) diff --git a/api/src/routes/pipelines.rs b/api/src/routes/pipelines.rs index 2450d4f..b6a1fe0 100644 --- a/api/src/routes/pipelines.rs +++ b/api/src/routes/pipelines.rs @@ -9,7 +9,17 @@ use serde::{Deserialize, Serialize}; use sqlx::PgPool; use thiserror::Error; -use crate::db::{self, pipelines::PipelineConfig, sinks::sink_exists, sources::source_exists}; +use crate::{ + db::{ + self, + pipelines::PipelineConfig, + sinks::{sink_exists, SinkConfig}, + sources::{source_exists, SourceConfig}, + }, + queue::enqueue_task, + replicator_config, + worker::{Request, Secrets}, +}; use super::ErrorMessage; @@ -27,6 +37,9 @@ enum PipelineError { #[error("sink with id {0} not found")] SinkNotFound(i64), + #[error("replicator with pipeline id {0} not found")] + ReplicatorNotFound(i64), + #[error("tenant id missing in request")] TenantIdMissing, @@ -51,9 +64,9 @@ impl PipelineError { impl ResponseError for PipelineError { fn status_code(&self) -> StatusCode { match self { - PipelineError::DatabaseError(_) | PipelineError::InvalidConfig(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } + PipelineError::DatabaseError(_) + | PipelineError::InvalidConfig(_) + | PipelineError::ReplicatorNotFound(_) => StatusCode::INTERNAL_SERVER_ERROR, PipelineError::PipelineNotFound(_) => StatusCode::NOT_FOUND, PipelineError::TenantIdMissing | PipelineError::TenantIdIllFormed @@ -93,6 +106,7 @@ struct GetPipelineResponse { tenant_id: i64, source_id: i64, sink_id: i64, + replicator_id: i64, publication_name: String, config: PipelineConfig, } @@ -162,6 +176,7 @@ pub async fn read_pipeline( tenant_id: s.tenant_id, source_id: s.source_id, sink_id: s.sink_id, + replicator_id: s.replicator_id, publication_name: s.publication_name, config, }) @@ -238,6 +253,7 @@ pub async fn read_all_pipelines( tenant_id: pipeline.tenant_id, source_id: pipeline.source_id, sink_id: pipeline.sink_id, + replicator_id: pipeline.replicator_id, publication_name: pipeline.publication_name, config, }; @@ -245,3 +261,108 @@ pub async fn read_all_pipelines( } Ok(Json(pipelines)) } + +#[get("/pipelines/{pipeline_id}/start")] +pub async fn start_pipeline( + req: HttpRequest, + pool: Data, + pipeline_id: Path, +) -> Result { + let tenant_id = extract_tenant_id(&req)?; + let pipeline_id = pipeline_id.into_inner(); + + let pipeline = db::pipelines::read_pipeline(&pool, tenant_id, pipeline_id) + .await? + .ok_or(PipelineError::PipelineNotFound(pipeline_id))?; + let pipeline_config: PipelineConfig = serde_json::from_value(pipeline.config)?; + let batch_config = pipeline_config.config; + + let replicator = db::replicators::read_replicator_by_pipeline_id(&pool, tenant_id, pipeline_id) + .await? + .ok_or(PipelineError::ReplicatorNotFound(pipeline_id))?; + + let source_id = pipeline.source_id; + let source = db::sources::read_source(&pool, tenant_id, source_id) + .await? + .ok_or(PipelineError::SourceNotFound(source_id))?; + let source_config: SourceConfig = serde_json::from_value(source.config)?; + let SourceConfig::Postgres { + host, + port, + name, + username, + password: postgres_password, + slot_name, + } = source_config; + + let sink_id = pipeline.sink_id; + let sink = db::sinks::read_sink(&pool, tenant_id, sink_id) + .await? + .ok_or(PipelineError::SinkNotFound(sink_id))?; + let sink_config: SinkConfig = serde_json::from_value(sink.config)?; + let SinkConfig::BigQuery { + project_id, + dataset_id, + service_account_key: bigquery_service_account_key, + } = sink_config; + + let secrets = Secrets { + postgres_password: postgres_password.unwrap_or_default(), + bigquery_service_account_key, + }; + + let req = Request::CreateOrUpdateSecrets { + tenant_id, + replicator_id: replicator.id, + secrets, + }; + + let task_data = serde_json::to_value(req)?; + enqueue_task(&pool, "create_or_update_secrets", task_data).await?; + + let source_config = replicator_config::SourceConfig::Postgres { + host, + port, + name, + username, + slot_name, + publication: pipeline.publication_name, + }; + + let sink_config = replicator_config::SinkConfig::BigQuery { + project_id, + dataset_id, + }; + + let batch_config = replicator_config::BatchConfig { + max_size: batch_config.max_size, + max_fill_secs: batch_config.max_fill_secs, + }; + + let replicator_config = replicator_config::Config { + source: source_config, + sink: sink_config, + batch: batch_config, + }; + + let req = Request::CreateOrUpdateConfig { + tenant_id, + replicator_id: replicator.id, + config: replicator_config, + }; + + let task_data = serde_json::to_value(req)?; + enqueue_task(&pool, "create_or_update_config", task_data).await?; + + let req = Request::CreateOrUpdateReplicator { + tenant_id, + replicator_id: replicator.id, + replicator_image: "ramsup/replicator:main.96457e346eed76c0dce648dbfd5a6f645220ea9a" + .to_string(), //TODO: remove hardcode image + }; + + let task_data = serde_json::to_value(req)?; + enqueue_task(&pool, "create_or_update_replicator", task_data).await?; + + Ok(HttpResponse::Ok().finish()) +} diff --git a/api/src/worker.rs b/api/src/worker.rs index 5fa9f39..1be77c6 100644 --- a/api/src/worker.rs +++ b/api/src/worker.rs @@ -39,33 +39,39 @@ async fn worker_loop(pool: PgPool, poll_duration: Duration) -> Result<(), anyhow #[derive(serde::Serialize, serde::Deserialize)] pub struct Secrets { - postgres_password: String, - bigquery_service_account_key: String, + pub postgres_password: String, + pub bigquery_service_account_key: String, } #[allow(clippy::large_enum_variant)] #[derive(serde::Serialize, serde::Deserialize)] pub enum Request { CreateOrUpdateSecrets { - project_ref: String, + tenant_id: i64, + replicator_id: i64, secrets: Secrets, }, CreateOrUpdateConfig { - project_ref: String, + tenant_id: i64, + replicator_id: i64, config: replicator_config::Config, }, CreateOrUpdateReplicator { - project_ref: String, + tenant_id: i64, + replicator_id: i64, replicator_image: String, }, DeleteSecrets { - project_ref: String, + tenant_id: i64, + replicator_id: i64, }, DeleteConfig { - project_ref: String, + tenant_id: i64, + replicator_id: i64, }, DeleteReplicator { - project_ref: String, + tenant_id: i64, + replicator_id: i64, }, } @@ -82,57 +88,76 @@ pub async fn try_execute_task( match request { Request::CreateOrUpdateSecrets { - project_ref, + tenant_id, + replicator_id, secrets, } => { - info!("creating secrets for project ref: {}", project_ref); + info!("creating secrets for tenant_id: {tenant_id}, replicator_id: {replicator_id}"); let Secrets { postgres_password, bigquery_service_account_key, } = secrets; + let prefix = format!("{tenant_id}_{replicator_id}"); k8s_client - .create_or_update_postgres_secret(&project_ref, &postgres_password) + .create_or_update_postgres_secret(&prefix, &postgres_password) .await?; k8s_client - .create_or_update_bq_secret(&project_ref, &bigquery_service_account_key) + .create_or_update_bq_secret(&prefix, &bigquery_service_account_key) .await?; } Request::CreateOrUpdateConfig { - project_ref, + tenant_id, + replicator_id, config, } => { - info!("creating config map for project ref: {}", project_ref); + info!("creating config map for tenant_id: {tenant_id}, replicator_id: {replicator_id}"); let base_config = ""; let prod_config = serde_json::to_string(&config)?; + let prefix = format!("{tenant_id}_{replicator_id}"); k8s_client - .create_or_update_config_map(&project_ref, base_config, &prod_config) + .create_or_update_config_map(&prefix, base_config, &prod_config) .await?; } Request::CreateOrUpdateReplicator { - project_ref, + tenant_id, + replicator_id, replicator_image, } => { info!( - "creating or updating stateful set for project ref: {}", - project_ref + "creating or updating stateful set for tenant_id: {tenant_id}, replicator_id: {replicator_id}" ); + let prefix = format!("{tenant_id}_{replicator_id}"); k8s_client - .create_or_update_stateful_set(&project_ref, &replicator_image) + .create_or_update_stateful_set(&prefix, &replicator_image) .await?; } - Request::DeleteSecrets { project_ref } => { - info!("deleting secrets for project ref: {}", project_ref); - k8s_client.delete_postgres_secret(&project_ref).await?; - k8s_client.delete_bq_secret(&project_ref).await?; + Request::DeleteSecrets { + tenant_id, + replicator_id, + } => { + info!("deleting secrets for tenant_id: {tenant_id}, replicator_id: {replicator_id}"); + let prefix = format!("{tenant_id}_{replicator_id}"); + k8s_client.delete_postgres_secret(&prefix).await?; + k8s_client.delete_bq_secret(&prefix).await?; } - Request::DeleteConfig { project_ref } => { - info!("deleting config map for project ref: {}", project_ref); - k8s_client.delete_config_map(&project_ref).await?; + Request::DeleteConfig { + tenant_id, + replicator_id, + } => { + info!("deleting config map for tenant_id: {tenant_id}, replicator_id: {replicator_id}"); + let prefix = format!("{tenant_id}_{replicator_id}"); + k8s_client.delete_config_map(&prefix).await?; } - Request::DeleteReplicator { project_ref } => { - info!("deleting stateful set for project ref: {}", project_ref); - k8s_client.delete_stateful_set(&project_ref).await?; + Request::DeleteReplicator { + tenant_id, + replicator_id, + } => { + info!( + "deleting stateful set for tenant_id: {tenant_id}, replicator_id: {replicator_id}" + ); + let prefix = format!("{tenant_id}_{replicator_id}"); + k8s_client.delete_stateful_set(&prefix).await?; } } diff --git a/api/tests/api/pipelines.rs b/api/tests/api/pipelines.rs index 2134e50..1e8241d 100644 --- a/api/tests/api/pipelines.rs +++ b/api/tests/api/pipelines.rs @@ -154,6 +154,7 @@ async fn an_existing_pipeline_can_be_read() { assert_eq!(response.tenant_id, tenant_id); assert_eq!(response.source_id, source_id); assert_eq!(response.sink_id, sink_id); + assert!(response.replicator_id != 0); assert_eq!(response.config, pipeline.config); } diff --git a/api/tests/api/test_app.rs b/api/tests/api/test_app.rs index 63a9706..363e179 100644 --- a/api/tests/api/test_app.rs +++ b/api/tests/api/test_app.rs @@ -99,6 +99,7 @@ pub struct PipelineResponse { pub tenant_id: i64, pub source_id: i64, pub sink_id: i64, + pub replicator_id: i64, pub publication_name: String, pub config: PipelineConfig, }