diff --git a/api/.sqlx/query-226d69b8afa5b25eca7b31190d01599f169c2237ebdaefdce33a64d880c2d4c9.json b/api/.sqlx/query-226d69b8afa5b25eca7b31190d01599f169c2237ebdaefdce33a64d880c2d4c9.json deleted file mode 100644 index fb476a8..0000000 --- a/api/.sqlx/query-226d69b8afa5b25eca7b31190d01599f169c2237ebdaefdce33a64d880c2d4c9.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n update publications\n set source_id = $1, name = $2, config = $3\n where tenant_id = $4 and id = $5\n returning id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8", - "Text", - "Jsonb", - "Int8", - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "226d69b8afa5b25eca7b31190d01599f169c2237ebdaefdce33a64d880c2d4c9" -} diff --git a/api/.sqlx/query-741be149d54a691c726098bef90072e23f1315be11cd01a0a84ae327223d63b1.json b/api/.sqlx/query-3918489540d26371880ff919e3eacb8f26f1d2b387ebe34b6378b11deee7d33d.json similarity index 64% rename from api/.sqlx/query-741be149d54a691c726098bef90072e23f1315be11cd01a0a84ae327223d63b1.json rename to api/.sqlx/query-3918489540d26371880ff919e3eacb8f26f1d2b387ebe34b6378b11deee7d33d.json index 5eb93f5..1434d6c 100644 --- a/api/.sqlx/query-741be149d54a691c726098bef90072e23f1315be11cd01a0a84ae327223d63b1.json +++ b/api/.sqlx/query-3918489540d26371880ff919e3eacb8f26f1d2b387ebe34b6378b11deee7d33d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n update pipelines\n set source_id = $1, sink_id = $2, publication_id = $3, config = $4\n where tenant_id = $5 and id = $6\n returning id\n ", + "query": "\n update pipelines\n set source_id = $1, sink_id = $2, publication_name = $3, config = $4\n where tenant_id = $5 and id = $6\n returning id\n ", "describe": { "columns": [ { @@ -13,7 +13,7 @@ "Left": [ "Int8", "Int8", - "Int8", + "Text", "Jsonb", "Int8", "Int8" @@ -23,5 +23,5 @@ false ] }, - "hash": "741be149d54a691c726098bef90072e23f1315be11cd01a0a84ae327223d63b1" + "hash": "3918489540d26371880ff919e3eacb8f26f1d2b387ebe34b6378b11deee7d33d" } diff --git a/api/.sqlx/query-396da13e6037f477f1d0bc5758fa59ed8611ca9b1ee22265395b6c85b675216c.json b/api/.sqlx/query-396da13e6037f477f1d0bc5758fa59ed8611ca9b1ee22265395b6c85b675216c.json deleted file mode 100644 index 0dd10ad..0000000 --- a/api/.sqlx/query-396da13e6037f477f1d0bc5758fa59ed8611ca9b1ee22265395b6c85b675216c.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n insert into publications (tenant_id, source_id, name, config)\n values ($1, $2, $3, $4)\n returning id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8", - "Int8", - "Text", - "Jsonb" - ] - }, - "nullable": [ - false - ] - }, - "hash": "396da13e6037f477f1d0bc5758fa59ed8611ca9b1ee22265395b6c85b675216c" -} diff --git a/api/.sqlx/query-4fba9b9d90a8bd697befe9ac79fde8de0098154c541dd59b0c3180facc5e60a6.json b/api/.sqlx/query-46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d.json similarity index 77% rename from api/.sqlx/query-4fba9b9d90a8bd697befe9ac79fde8de0098154c541dd59b0c3180facc5e60a6.json rename to api/.sqlx/query-46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d.json index c624a67..3c0c7ba 100644 --- a/api/.sqlx/query-4fba9b9d90a8bd697befe9ac79fde8de0098154c541dd59b0c3180facc5e60a6.json +++ b/api/.sqlx/query-46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select id, tenant_id, source_id, sink_id, publication_id, config\n from pipelines\n where tenant_id = $1 and id = $2\n ", + "query": "\n select id, tenant_id, source_id, sink_id, publication_name, config\n from pipelines\n where tenant_id = $1 and id = $2\n ", "describe": { "columns": [ { @@ -25,8 +25,8 @@ }, { "ordinal": 4, - "name": "publication_id", - "type_info": "Int8" + "name": "publication_name", + "type_info": "Text" }, { "ordinal": 5, @@ -49,5 +49,5 @@ false ] }, - "hash": "4fba9b9d90a8bd697befe9ac79fde8de0098154c541dd59b0c3180facc5e60a6" + "hash": "46b6c43ee097d96fdfb7fb3cdec1272d8edc1414e7cb44f56d5965c93fcaba9d" } diff --git a/api/.sqlx/query-609d265edce356f5f0109c99dba5a15b1733c92fae99dcd3558679d1753d4636.json b/api/.sqlx/query-609d265edce356f5f0109c99dba5a15b1733c92fae99dcd3558679d1753d4636.json deleted file mode 100644 index 6f88ebb..0000000 --- a/api/.sqlx/query-609d265edce356f5f0109c99dba5a15b1733c92fae99dcd3558679d1753d4636.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n select id, tenant_id, source_id, name, config\n from publications\n where tenant_id = $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "tenant_id", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "source_id", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "name", - "type_info": "Text" - }, - { - "ordinal": 4, - "name": "config", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "609d265edce356f5f0109c99dba5a15b1733c92fae99dcd3558679d1753d4636" -} diff --git a/api/.sqlx/query-62440fdc593cbae237b82d23bd1a5de2063d9ddb58bcdae295ac88e02d765712.json b/api/.sqlx/query-62440fdc593cbae237b82d23bd1a5de2063d9ddb58bcdae295ac88e02d765712.json deleted file mode 100644 index 327b710..0000000 --- a/api/.sqlx/query-62440fdc593cbae237b82d23bd1a5de2063d9ddb58bcdae295ac88e02d765712.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n select id, tenant_id, source_id, name, config\n from publications\n where tenant_id = $1 and id = $2\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "tenant_id", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "source_id", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "name", - "type_info": "Text" - }, - { - "ordinal": 4, - "name": "config", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [ - "Int8", - "Int8" - ] - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "62440fdc593cbae237b82d23bd1a5de2063d9ddb58bcdae295ac88e02d765712" -} diff --git a/api/.sqlx/query-819b07177509ce42cc3f682d97648363a60a97785e72e04cd2bb6326924cd66c.json b/api/.sqlx/query-819b07177509ce42cc3f682d97648363a60a97785e72e04cd2bb6326924cd66c.json deleted file mode 100644 index bf764f3..0000000 --- a/api/.sqlx/query-819b07177509ce42cc3f682d97648363a60a97785e72e04cd2bb6326924cd66c.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n select exists (select id\n from publications\n where tenant_id = $1 and id = $2)\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "exists", - "type_info": "Bool" - } - ], - "parameters": { - "Left": [ - "Int8", - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "819b07177509ce42cc3f682d97648363a60a97785e72e04cd2bb6326924cd66c" -} diff --git a/api/.sqlx/query-3d286537eeae52fd9c7118790dc2f8004c5440da61595a1481eb93f3f3076a7c.json b/api/.sqlx/query-9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2.json similarity index 77% rename from api/.sqlx/query-3d286537eeae52fd9c7118790dc2f8004c5440da61595a1481eb93f3f3076a7c.json rename to api/.sqlx/query-9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2.json index bfcc539..437f76b 100644 --- a/api/.sqlx/query-3d286537eeae52fd9c7118790dc2f8004c5440da61595a1481eb93f3f3076a7c.json +++ b/api/.sqlx/query-9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select id, tenant_id, source_id, sink_id, publication_id, config\n from pipelines\n where tenant_id = $1\n ", + "query": "\n select id, tenant_id, source_id, sink_id, publication_name, config\n from pipelines\n where tenant_id = $1\n ", "describe": { "columns": [ { @@ -25,8 +25,8 @@ }, { "ordinal": 4, - "name": "publication_id", - "type_info": "Int8" + "name": "publication_name", + "type_info": "Text" }, { "ordinal": 5, @@ -48,5 +48,5 @@ false ] }, - "hash": "3d286537eeae52fd9c7118790dc2f8004c5440da61595a1481eb93f3f3076a7c" + "hash": "9c6b897cc7e8a81b339c648fdcb6efe3340bce3a6e6a18fd84716a61e3ab20c2" } diff --git a/api/.sqlx/query-9fa6ac068eb842d1f7912c8e0e209c48777dd04c569a636368f04925e7181506.json b/api/.sqlx/query-9fa6ac068eb842d1f7912c8e0e209c48777dd04c569a636368f04925e7181506.json deleted file mode 100644 index 7b4ab93..0000000 --- a/api/.sqlx/query-9fa6ac068eb842d1f7912c8e0e209c48777dd04c569a636368f04925e7181506.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n delete from publications\n where tenant_id = $1 and id = $2\n returning id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8", - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "9fa6ac068eb842d1f7912c8e0e209c48777dd04c569a636368f04925e7181506" -} diff --git a/api/.sqlx/query-d3c81b85d9a40ad36bd6c5642119a321afeff6d4aaa053c5b0a281d38dbfebeb.json b/api/.sqlx/query-f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe.json similarity index 65% rename from api/.sqlx/query-d3c81b85d9a40ad36bd6c5642119a321afeff6d4aaa053c5b0a281d38dbfebeb.json rename to api/.sqlx/query-f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe.json index e4095b2..8e8ed1c 100644 --- a/api/.sqlx/query-d3c81b85d9a40ad36bd6c5642119a321afeff6d4aaa053c5b0a281d38dbfebeb.json +++ b/api/.sqlx/query-f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n insert into pipelines (tenant_id, source_id, sink_id, publication_id, config)\n values ($1, $2, $3, $4, $5)\n returning id\n ", + "query": "\n insert into pipelines (tenant_id, source_id, sink_id, publication_name, config)\n values ($1, $2, $3, $4, $5)\n returning id\n ", "describe": { "columns": [ { @@ -14,7 +14,7 @@ "Int8", "Int8", "Int8", - "Int8", + "Text", "Jsonb" ] }, @@ -22,5 +22,5 @@ false ] }, - "hash": "d3c81b85d9a40ad36bd6c5642119a321afeff6d4aaa053c5b0a281d38dbfebeb" + "hash": "f3355ccb2f7a4a56b1ef007ccdc1b72fc186de50dafd5a90149ee2d1b501ccfe" } diff --git a/api/migrations/20240822082905_create_publications.sql b/api/migrations/20240822082905_create_publications.sql deleted file mode 100644 index 3fe0b89..0000000 --- a/api/migrations/20240822082905_create_publications.sql +++ /dev/null @@ -1,8 +0,0 @@ -create table - public.publications ( - id bigint generated always as identity primary key, - tenant_id bigint references public.tenants(id) not null, - source_id bigint references public.sources(id) not null, - name text not null, - config jsonb not null - ); diff --git a/api/migrations/20240828090309_create_pipelines.sql b/api/migrations/20240828090309_create_pipelines.sql index 143ebe5..fea545e 100644 --- a/api/migrations/20240828090309_create_pipelines.sql +++ b/api/migrations/20240828090309_create_pipelines.sql @@ -4,6 +4,6 @@ create table tenant_id bigint references public.tenants(id) not null, source_id bigint references public.sources(id) not null, sink_id bigint references public.sinks(id) not null, - publication_id bigint references public.publications(id) not null, + publication_name text not null, config jsonb not null ); diff --git a/api/src/db/pipelines.rs b/api/src/db/pipelines.rs index 9f3fcba..d3dfef4 100644 --- a/api/src/db/pipelines.rs +++ b/api/src/db/pipelines.rs @@ -19,7 +19,7 @@ pub struct Pipeline { pub tenant_id: i64, pub source_id: i64, pub sink_id: i64, - pub publication_id: i64, + pub publication_name: String, pub config: serde_json::Value, } @@ -28,20 +28,20 @@ pub async fn create_pipeline( tenant_id: i64, source_id: i64, sink_id: i64, - publication_id: i64, + publication_name: String, config: &PipelineConfig, ) -> Result { let config = serde_json::to_value(config).expect("failed to serialize config"); let record = sqlx::query!( r#" - insert into pipelines (tenant_id, source_id, sink_id, publication_id, config) + insert into pipelines (tenant_id, source_id, sink_id, publication_name, config) values ($1, $2, $3, $4, $5) returning id "#, tenant_id, source_id, sink_id, - publication_id, + publication_name, config ) .fetch_one(pool) @@ -57,7 +57,7 @@ pub async fn read_pipeline( ) -> Result, sqlx::Error> { let record = sqlx::query!( r#" - select id, tenant_id, source_id, sink_id, publication_id, config + select id, tenant_id, source_id, sink_id, publication_name, config from pipelines where tenant_id = $1 and id = $2 "#, @@ -72,7 +72,7 @@ pub async fn read_pipeline( tenant_id: r.tenant_id, source_id: r.source_id, sink_id: r.sink_id, - publication_id: r.publication_id, + publication_name: r.publication_name, config: r.config, })) } @@ -83,20 +83,20 @@ pub async fn update_pipeline( pipeline_id: i64, source_id: i64, sink_id: i64, - publication_id: i64, + publication_name: String, config: &PipelineConfig, ) -> Result, sqlx::Error> { let config = serde_json::to_value(config).expect("failed to serialize config"); let record = sqlx::query!( r#" update pipelines - set source_id = $1, sink_id = $2, publication_id = $3, config = $4 + set source_id = $1, sink_id = $2, publication_name = $3, config = $4 where tenant_id = $5 and id = $6 returning id "#, source_id, sink_id, - publication_id, + publication_name, config, tenant_id, pipeline_id @@ -133,7 +133,7 @@ pub async fn read_all_pipelines( ) -> Result, sqlx::Error> { let mut record = sqlx::query!( r#" - select id, tenant_id, source_id, sink_id, publication_id, config + select id, tenant_id, source_id, sink_id, publication_name, config from pipelines where tenant_id = $1 "#, @@ -149,7 +149,7 @@ pub async fn read_all_pipelines( tenant_id: r.tenant_id, source_id: r.source_id, sink_id: r.sink_id, - publication_id: r.publication_id, + publication_name: r.publication_name, config: r.config, }) .collect()) diff --git a/api/src/db/publications.rs b/api/src/db/publications.rs index 46de0c3..d92acb9 100644 --- a/api/src/db/publications.rs +++ b/api/src/db/publications.rs @@ -1,162 +1,53 @@ -use sqlx::PgPool; - -#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -pub struct PublicationConfig { - pub table_names: Vec, -} - -pub struct Publication { - pub id: i64, - pub tenant_id: i64, - pub source_id: i64, - pub name: String, - pub config: serde_json::Value, -} - -pub async fn create_publication( - pool: &PgPool, - tenant_id: i64, - source_id: i64, - name: String, - config: &PublicationConfig, -) -> Result { - let config = serde_json::to_value(config).expect("failed to serialize config"); - let record = sqlx::query!( - r#" - insert into publications (tenant_id, source_id, name, config) - values ($1, $2, $3, $4) - returning id - "#, - tenant_id, - source_id, - name, - config - ) - .fetch_one(pool) - .await?; - - Ok(record.id) -} - -pub async fn read_publication( - pool: &PgPool, - tenant_id: i64, - publication_id: i64, -) -> Result, sqlx::Error> { - let record = sqlx::query!( - r#" - select id, tenant_id, source_id, name, config - from publications - where tenant_id = $1 and id = $2 - "#, - tenant_id, - publication_id, - ) - .fetch_optional(pool) - .await?; - - Ok(record.map(|r| Publication { - id: r.id, - tenant_id: r.tenant_id, - source_id: r.source_id, - name: r.name, - config: r.config, - })) -} - -pub async fn update_publication( - pool: &PgPool, - tenant_id: i64, - publication_id: i64, - source_id: i64, - name: String, - config: &PublicationConfig, -) -> Result, sqlx::Error> { - let config = serde_json::to_value(config).expect("failed to serialize config"); - let record = sqlx::query!( - r#" - update publications - set source_id = $1, name = $2, config = $3 - where tenant_id = $4 and id = $5 - returning id - "#, - source_id, - name, - config, - tenant_id, - publication_id - ) - .fetch_optional(pool) - .await?; - - Ok(record.map(|r| r.id)) -} - -pub async fn delete_publication( - pool: &PgPool, - tenant_id: i64, - publication_id: i64, -) -> Result, sqlx::Error> { - let record = sqlx::query!( - r#" - delete from publications - where tenant_id = $1 and id = $2 - returning id - "#, - tenant_id, - publication_id - ) - .fetch_optional(pool) - .await?; - - Ok(record.map(|r| r.id)) -} - -pub async fn read_all_publications( - pool: &PgPool, - tenant_id: i64, -) -> Result, sqlx::Error> { - let mut record = sqlx::query!( - r#" - select id, tenant_id, source_id, name, config - from publications - where tenant_id = $1 - "#, - tenant_id, - ) - .fetch_all(pool) - .await?; - - Ok(record - .drain(..) - .map(|r| Publication { - id: r.id, - tenant_id: r.tenant_id, - source_id: r.source_id, - name: r.name, - config: r.config, - }) - .collect()) -} - -pub async fn publication_exists( - pool: &PgPool, - tenant_id: i64, - publication_id: i64, -) -> Result { - let record = sqlx::query!( - r#" - select exists (select id - from publications - where tenant_id = $1 and id = $2) - "#, - tenant_id, - publication_id, - ) - .fetch_one(pool) - .await?; - - Ok(record - .exists - .expect("select exists always returns a non-null value")) +use std::borrow::Cow; + +use sqlx::{postgres::PgConnectOptions, Connection, Executor, PgConnection}; + +use super::tables::Table; + +pub fn quote_identifier(identifier: &str) -> Cow<'_, str> { + if identifier.find('"').is_some() { + Cow::Owned(quote_identifier_alloc(identifier)) + } else { + Cow::Borrowed(identifier) + } +} + +fn quote_identifier_alloc(identifier: &str) -> String { + let mut quoted_identifier = String::with_capacity(identifier.len()); + for char in identifier.chars() { + if char == '"' { + quoted_identifier.push('"'); + } + quoted_identifier.push(char); + } + quoted_identifier +} + +pub async fn create_publication_on_source( + publication_name: &str, + tables: &[Table], + options: &PgConnectOptions, +) -> Result<(), sqlx::Error> { + let mut query = String::new(); + let quoted_publication_name = quote_identifier(publication_name); + query.push_str("create publication "); + query.push_str("ed_publication_name); + query.push_str(" table only "); + + for (i, table) in tables.iter().enumerate() { + let quoted_schema = quote_identifier(&table.schema); + let quoted_name = quote_identifier(&table.name); + query.push_str("ed_schema); + query.push('.'); + query.push_str("ed_name); + + if i < tables.len() - 1 { + query.push(',') + } + } + + let mut connection = PgConnection::connect_with(options).await?; + connection.execute(query.as_str()).await?; + + Ok(()) } diff --git a/api/src/routes/mod.rs b/api/src/routes/mod.rs index c55aa6c..36246cd 100644 --- a/api/src/routes/mod.rs +++ b/api/src/routes/mod.rs @@ -2,7 +2,6 @@ use serde::Serialize; pub mod health_check; pub mod pipelines; -pub mod publications; pub mod sinks; pub mod sources; pub mod tenants; diff --git a/api/src/routes/pipelines.rs b/api/src/routes/pipelines.rs index 8051843..2450d4f 100644 --- a/api/src/routes/pipelines.rs +++ b/api/src/routes/pipelines.rs @@ -9,10 +9,7 @@ use serde::{Deserialize, Serialize}; use sqlx::PgPool; use thiserror::Error; -use crate::db::{ - self, pipelines::PipelineConfig, publications::publication_exists, sinks::sink_exists, - sources::source_exists, -}; +use crate::db::{self, pipelines::PipelineConfig, sinks::sink_exists, sources::source_exists}; use super::ErrorMessage; @@ -30,9 +27,6 @@ enum PipelineError { #[error("sink with id {0} not found")] SinkNotFound(i64), - #[error("publication with id {0} not found")] - PublicationNotFound(i64), - #[error("tenant id missing in request")] TenantIdMissing, @@ -64,8 +58,7 @@ impl ResponseError for PipelineError { PipelineError::TenantIdMissing | PipelineError::TenantIdIllFormed | PipelineError::SourceNotFound(_) - | PipelineError::SinkNotFound(_) - | PipelineError::PublicationNotFound(_) => StatusCode::BAD_REQUEST, + | PipelineError::SinkNotFound(_) => StatusCode::BAD_REQUEST, } } @@ -85,7 +78,7 @@ impl ResponseError for PipelineError { struct PostPipelineRequest { pub source_id: i64, pub sink_id: i64, - pub publication_id: i64, + pub publication_name: String, pub config: PipelineConfig, } @@ -100,7 +93,7 @@ struct GetPipelineResponse { tenant_id: i64, source_id: i64, sink_id: i64, - publication_id: i64, + publication_name: String, config: PipelineConfig, } @@ -137,16 +130,12 @@ pub async fn create_pipeline( return Err(PipelineError::SinkNotFound(pipeline.sink_id)); } - if !publication_exists(&pool, tenant_id, pipeline.publication_id).await? { - return Err(PipelineError::PublicationNotFound(pipeline.publication_id)); - } - let id = db::pipelines::create_pipeline( &pool, tenant_id, pipeline.source_id, pipeline.sink_id, - pipeline.publication_id, + pipeline.publication_name, &config, ) .await?; @@ -173,7 +162,7 @@ pub async fn read_pipeline( tenant_id: s.tenant_id, source_id: s.source_id, sink_id: s.sink_id, - publication_id: s.publication_id, + publication_name: s.publication_name, config, }) }) @@ -190,29 +179,29 @@ pub async fn update_pipeline( pipeline_id: Path, pipeline: Json, ) -> Result { + let pipeline = pipeline.0; let tenant_id = extract_tenant_id(&req)?; let pipeline_id = pipeline_id.into_inner(); let config = &pipeline.config; + let source_id = pipeline.source_id; + let sink_id = pipeline.sink_id; + let publication_name = pipeline.publication_name; - if !source_exists(&pool, tenant_id, pipeline.source_id).await? { - return Err(PipelineError::SourceNotFound(pipeline.source_id)); + if !source_exists(&pool, tenant_id, source_id).await? { + return Err(PipelineError::SourceNotFound(source_id)); } - if !sink_exists(&pool, tenant_id, pipeline.sink_id).await? { - return Err(PipelineError::SinkNotFound(pipeline.sink_id)); - } - - if !publication_exists(&pool, tenant_id, pipeline.publication_id).await? { - return Err(PipelineError::PublicationNotFound(pipeline.publication_id)); + if !sink_exists(&pool, tenant_id, sink_id).await? { + return Err(PipelineError::SinkNotFound(sink_id)); } db::pipelines::update_pipeline( &pool, tenant_id, pipeline_id, - pipeline.source_id, - pipeline.sink_id, - pipeline.publication_id, + source_id, + sink_id, + publication_name, config, ) .await? @@ -249,7 +238,7 @@ pub async fn read_all_pipelines( tenant_id: pipeline.tenant_id, source_id: pipeline.source_id, sink_id: pipeline.sink_id, - publication_id: pipeline.publication_id, + publication_name: pipeline.publication_name, config, }; pipelines.push(sink); diff --git a/api/src/routes/publications.rs b/api/src/routes/publications.rs deleted file mode 100644 index afb20ea..0000000 --- a/api/src/routes/publications.rs +++ /dev/null @@ -1,221 +0,0 @@ -use actix_web::{ - delete, get, - http::{header::ContentType, StatusCode}, - post, - web::{Data, Json, Path}, - HttpRequest, HttpResponse, Responder, ResponseError, -}; -use serde::{Deserialize, Serialize}; -use sqlx::PgPool; -use thiserror::Error; - -use crate::db::{self, publications::PublicationConfig, sources::source_exists}; - -use super::ErrorMessage; - -#[derive(Debug, Error)] -enum PublicationError { - #[error("database error: {0}")] - DatabaseError(#[from] sqlx::Error), - - #[error("publication with id {0} not found")] - PublicationNotFound(i64), - - #[error("source with id {0} not found")] - SourceNotFound(i64), - - #[error("tenant id missing in request")] - TenantIdMissing, - - #[error("tenant id ill formed in request")] - TenantIdIllFormed, - - #[error("invalid sink config")] - InvalidConfig(#[from] serde_json::Error), -} - -impl PublicationError { - fn to_message(&self) -> String { - match self { - // Do not expose internal database details in error messages - PublicationError::DatabaseError(_) => "internal server error".to_string(), - // Every other message is ok, as they do not divulge sensitive information - e => e.to_string(), - } - } -} - -impl ResponseError for PublicationError { - fn status_code(&self) -> StatusCode { - match self { - PublicationError::DatabaseError(_) | PublicationError::InvalidConfig(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } - PublicationError::PublicationNotFound(_) => StatusCode::NOT_FOUND, - PublicationError::TenantIdMissing - | PublicationError::TenantIdIllFormed - | PublicationError::SourceNotFound(_) => StatusCode::BAD_REQUEST, - } - } - - fn error_response(&self) -> HttpResponse { - let error_message = ErrorMessage { - error: self.to_message(), - }; - let body = - serde_json::to_string(&error_message).expect("failed to serialize error message"); - HttpResponse::build(self.status_code()) - .insert_header(ContentType::json()) - .body(body) - } -} - -#[derive(Deserialize)] -struct PostPublicationRequest { - pub source_id: i64, - pub name: String, - pub config: PublicationConfig, -} - -#[derive(Serialize)] -struct PostPublicationResponse { - id: i64, -} - -#[derive(Serialize)] -struct GetPublicationResponse { - id: i64, - tenant_id: i64, - source_id: i64, - name: String, - config: PublicationConfig, -} - -// TODO: read tenant_id from a jwt -fn extract_tenant_id(req: &HttpRequest) -> Result { - let headers = req.headers(); - let tenant_id = headers - .get("tenant_id") - .ok_or(PublicationError::TenantIdMissing)?; - let tenant_id = tenant_id - .to_str() - .map_err(|_| PublicationError::TenantIdIllFormed)?; - let tenant_id: i64 = tenant_id - .parse() - .map_err(|_| PublicationError::TenantIdIllFormed)?; - Ok(tenant_id) -} - -#[post("/publications")] -pub async fn create_publication( - req: HttpRequest, - pool: Data, - publication: Json, -) -> Result { - let publication = publication.0; - let tenant_id = extract_tenant_id(&req)?; - let source_id = publication.source_id; - let name = publication.name; - let config = publication.config; - - if !source_exists(&pool, tenant_id, source_id).await? { - return Err(PublicationError::SourceNotFound(source_id)); - } - - let id = - db::publications::create_publication(&pool, tenant_id, source_id, name, &config).await?; - let response = PostPublicationResponse { id }; - - Ok(Json(response)) -} - -#[get("/publications/{publication_id}")] -pub async fn read_publication( - req: HttpRequest, - pool: Data, - publication_id: Path, -) -> Result { - let tenant_id = extract_tenant_id(&req)?; - let publication_id = publication_id.into_inner(); - - let response = db::publications::read_publication(&pool, tenant_id, publication_id) - .await? - .map(|s| { - let config: PublicationConfig = serde_json::from_value(s.config)?; - Ok::(GetPublicationResponse { - id: s.id, - tenant_id: s.tenant_id, - source_id: s.source_id, - name: s.name, - config, - }) - }) - .transpose()? - .ok_or(PublicationError::PublicationNotFound(publication_id))?; - - Ok(Json(response)) -} - -#[post("/publications/{publication_id}")] -pub async fn update_publication( - req: HttpRequest, - pool: Data, - publication_id: Path, - publication: Json, -) -> Result { - let publication = publication.0; - let tenant_id = extract_tenant_id(&req)?; - let publication_id = publication_id.into_inner(); - let source_id = publication.source_id; - let name = publication.name; - let config = &publication.config; - - if !source_exists(&pool, tenant_id, source_id).await? { - return Err(PublicationError::SourceNotFound(source_id)); - } - - db::publications::update_publication(&pool, tenant_id, publication_id, source_id, name, config) - .await? - .ok_or(PublicationError::PublicationNotFound(publication_id))?; - - Ok(HttpResponse::Ok().finish()) -} - -#[delete("/publications/{publication_id}")] -pub async fn delete_publication( - req: HttpRequest, - pool: Data, - publication_id: Path, -) -> Result { - let tenant_id = extract_tenant_id(&req)?; - let publication_id = publication_id.into_inner(); - - db::publications::delete_publication(&pool, tenant_id, publication_id) - .await? - .ok_or(PublicationError::PublicationNotFound(tenant_id))?; - - Ok(HttpResponse::Ok().finish()) -} - -#[get("/publications")] -pub async fn read_all_publications( - req: HttpRequest, - pool: Data, -) -> Result { - let tenant_id = extract_tenant_id(&req)?; - let mut publications = vec![]; - - for publication in db::publications::read_all_publications(&pool, tenant_id).await? { - let config: PublicationConfig = serde_json::from_value(publication.config)?; - let sink = GetPublicationResponse { - id: publication.id, - tenant_id: publication.tenant_id, - source_id: publication.source_id, - name: publication.name, - config, - }; - publications.push(sink); - } - - Ok(Json(publications)) -} diff --git a/api/src/routes/sources.rs b/api/src/routes/sources.rs index 778429a..e2e3dac 100644 --- a/api/src/routes/sources.rs +++ b/api/src/routes/sources.rs @@ -12,6 +12,7 @@ use thiserror::Error; use super::ErrorMessage; use crate::db::{self, sources::SourceConfig}; +pub mod publications; pub mod tables; #[derive(Debug, Error)] diff --git a/api/src/routes/sources/publications.rs b/api/src/routes/sources/publications.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/api/src/routes/sources/publications.rs @@ -0,0 +1 @@ + diff --git a/api/src/startup.rs b/api/src/startup.rs index f926e4d..19c1ee9 100644 --- a/api/src/startup.rs +++ b/api/src/startup.rs @@ -11,10 +11,6 @@ use crate::{ pipelines::{ create_pipeline, delete_pipeline, read_all_pipelines, read_pipeline, update_pipeline, }, - publications::{ - create_publication, delete_publication, read_all_publications, read_publication, - update_publication, - }, sinks::{create_sink, delete_sink, read_all_sinks, read_sink, update_sink}, sources::{ create_source, delete_source, read_all_sources, read_source, tables::read_table_names, @@ -89,12 +85,6 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result i64 { let pipeline = CreatePipelineRequest { source_id, sink_id, - publication_id, + publication_name: "publication".to_string(), config, }; let response = app.create_pipeline(tenant_id, &pipeline).await; @@ -59,13 +57,12 @@ async fn pipeline_can_be_created() { let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; - let publication_id = create_publication(&app, tenant_id, source_id).await; // Act let pipeline = CreatePipelineRequest { source_id, sink_id, - publication_id, + publication_name: "publication".to_string(), config: new_pipeline_config(), }; let response = app.create_pipeline(tenant_id, &pipeline).await; @@ -85,16 +82,14 @@ async fn pipeline_with_another_tenants_source_cant_be_created() { let app = spawn_app().await; let tenant1_id = create_tenant(&app).await; let tenant2_id = create_tenant(&app).await; - let source1_id = create_source(&app, tenant1_id).await; let source2_id = create_source(&app, tenant2_id).await; let sink1_id = create_sink(&app, tenant1_id).await; - let publication1_id = create_publication(&app, tenant1_id, source1_id).await; // Act let pipeline = CreatePipelineRequest { source_id: source2_id, sink_id: sink1_id, - publication_id: publication1_id, + publication_name: "publication".to_string(), config: new_pipeline_config(), }; let response = app.create_pipeline(tenant1_id, &pipeline).await; @@ -111,37 +106,12 @@ async fn pipeline_with_another_tenants_sink_cant_be_created() { let tenant2_id = create_tenant(&app).await; let source1_id = create_source(&app, tenant1_id).await; let sink2_id = create_sink(&app, tenant2_id).await; - let publication1_id = create_publication(&app, tenant1_id, source1_id).await; // Act let pipeline = CreatePipelineRequest { source_id: source1_id, sink_id: sink2_id, - publication_id: publication1_id, - config: new_pipeline_config(), - }; - let response = app.create_pipeline(tenant1_id, &pipeline).await; - - // Assert - assert_eq!(response.status(), StatusCode::BAD_REQUEST); -} - -#[tokio::test] -async fn pipeline_with_another_tenants_publication_cant_be_created() { - // Arrange - let app = spawn_app().await; - let tenant1_id = create_tenant(&app).await; - let tenant2_id = create_tenant(&app).await; - let source1_id = create_source(&app, tenant1_id).await; - let source2_id = create_source(&app, tenant2_id).await; - let sink1_id = create_sink(&app, tenant1_id).await; - let publication2_id = create_publication(&app, tenant2_id, source2_id).await; - - // Act - let pipeline = CreatePipelineRequest { - source_id: source1_id, - sink_id: sink1_id, - publication_id: publication2_id, + publication_name: "publication".to_string(), config: new_pipeline_config(), }; let response = app.create_pipeline(tenant1_id, &pipeline).await; @@ -157,12 +127,11 @@ async fn an_existing_pipeline_can_be_read() { let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; - let publication_id = create_publication(&app, tenant_id, source_id).await; let pipeline = CreatePipelineRequest { source_id, sink_id, - publication_id, + publication_name: "publication".to_string(), config: new_pipeline_config(), }; let response = app.create_pipeline(tenant_id, &pipeline).await; @@ -208,12 +177,11 @@ async fn an_existing_pipeline_can_be_updated() { let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; - let publication_id = create_publication(&app, tenant_id, source_id).await; let pipeline = CreatePipelineRequest { source_id, sink_id, - publication_id, + publication_name: "publication".to_string(), config: new_pipeline_config(), }; let response = app.create_pipeline(tenant_id, &pipeline).await; @@ -226,11 +194,10 @@ async fn an_existing_pipeline_can_be_updated() { // Act let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; - let publication_id = create_publication(&app, tenant_id, source_id).await; let updated_config = UpdatePipelineRequest { source_id, sink_id, - publication_id, + publication_name: "updated_publication".to_string(), config: updated_pipeline_config(), }; let response = app @@ -248,6 +215,7 @@ async fn an_existing_pipeline_can_be_updated() { assert_eq!(response.tenant_id, tenant_id); assert_eq!(response.source_id, source_id); assert_eq!(response.sink_id, sink_id); + assert_eq!(response.publication_name, "updated_publication".to_string()); assert_eq!(response.config, updated_config.config); } @@ -259,12 +227,11 @@ async fn pipeline_with_another_tenants_source_cant_be_updated() { let tenant2_id = create_tenant(&app).await; let source1_id = create_source(&app, tenant1_id).await; let sink1_id = create_sink(&app, tenant1_id).await; - let publication1_id = create_publication(&app, tenant1_id, source1_id).await; let pipeline = CreatePipelineRequest { source_id: source1_id, sink_id: sink1_id, - publication_id: publication1_id, + publication_name: "publication".to_string(), config: new_pipeline_config(), }; let response = app.create_pipeline(tenant1_id, &pipeline).await; @@ -279,7 +246,7 @@ async fn pipeline_with_another_tenants_source_cant_be_updated() { let updated_config = UpdatePipelineRequest { source_id: source2_id, sink_id: sink1_id, - publication_id: publication1_id, + publication_name: "updated_publication".to_string(), config: updated_pipeline_config(), }; let response = app @@ -298,12 +265,11 @@ async fn pipeline_with_another_tenants_sink_cant_be_updated() { let tenant2_id = create_tenant(&app).await; let source1_id = create_source(&app, tenant1_id).await; let sink1_id = create_sink(&app, tenant1_id).await; - let publication1_id = create_publication(&app, tenant1_id, source1_id).await; let pipeline = CreatePipelineRequest { source_id: source1_id, sink_id: sink1_id, - publication_id: publication1_id, + publication_name: "publication".to_string(), config: new_pipeline_config(), }; let response = app.create_pipeline(tenant1_id, &pipeline).await; @@ -318,47 +284,7 @@ async fn pipeline_with_another_tenants_sink_cant_be_updated() { let updated_config = UpdatePipelineRequest { source_id: source1_id, sink_id: sink2_id, - publication_id: publication1_id, - config: updated_pipeline_config(), - }; - let response = app - .update_pipeline(tenant1_id, pipeline_id, &updated_config) - .await; - - // Assert - assert_eq!(response.status(), StatusCode::BAD_REQUEST); -} - -#[tokio::test] -async fn pipeline_with_another_tenants_publication_cant_be_updated() { - // Arrange - let app = spawn_app().await; - let tenant1_id = create_tenant(&app).await; - let tenant2_id = create_tenant(&app).await; - let source1_id = create_source(&app, tenant1_id).await; - let source2_id = create_source(&app, tenant2_id).await; - let sink1_id = create_sink(&app, tenant1_id).await; - let publication1_id = create_publication(&app, tenant1_id, source1_id).await; - - let pipeline = CreatePipelineRequest { - source_id: source1_id, - sink_id: sink1_id, - publication_id: publication1_id, - config: new_pipeline_config(), - }; - let response = app.create_pipeline(tenant1_id, &pipeline).await; - let response: CreatePipelineResponse = response - .json() - .await - .expect("failed to deserialize response"); - let pipeline_id = response.id; - - // Act - let publication2_id = create_publication(&app, tenant2_id, source2_id).await; - let updated_config = UpdatePipelineRequest { - source_id: source1_id, - sink_id: sink1_id, - publication_id: publication2_id, + publication_name: "updated_publication".to_string(), config: updated_pipeline_config(), }; let response = app @@ -376,13 +302,12 @@ async fn an_non_existing_pipeline_cant_be_updated() { let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; - let publication_id = create_publication(&app, tenant_id, source_id).await; // Act let updated_config = UpdatePipelineRequest { source_id, sink_id, - publication_id, + publication_name: "publication".to_string(), config: updated_pipeline_config(), }; let response = app.update_pipeline(tenant_id, 42, &updated_config).await; @@ -398,12 +323,11 @@ async fn an_existing_pipeline_can_be_deleted() { let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; - let publication_id = create_publication(&app, tenant_id, source_id).await; let pipeline = CreatePipelineRequest { source_id, sink_id, - publication_id, + publication_name: "publication".to_string(), config: new_pipeline_config(), }; let response = app.create_pipeline(tenant_id, &pipeline).await; @@ -444,24 +368,15 @@ async fn all_pipelines_can_be_read() { let source2_id = create_source(&app, tenant_id).await; let sink1_id = create_sink(&app, tenant_id).await; let sink2_id = create_sink(&app, tenant_id).await; - let publication1_id = create_publication(&app, tenant_id, source1_id).await; - let publication2_id = create_publication(&app, tenant_id, source2_id).await; - let pipeline1_id = create_pipeline_with_config( - &app, - tenant_id, - source1_id, - sink1_id, - publication1_id, - new_pipeline_config(), - ) - .await; + let pipeline1_id = + create_pipeline_with_config(&app, tenant_id, source1_id, sink1_id, new_pipeline_config()) + .await; let pipeline2_id = create_pipeline_with_config( &app, tenant_id, source2_id, sink2_id, - publication2_id, updated_pipeline_config(), ) .await; @@ -481,14 +396,12 @@ async fn all_pipelines_can_be_read() { assert_eq!(pipeline.tenant_id, tenant_id); assert_eq!(pipeline.source_id, source1_id); assert_eq!(pipeline.sink_id, sink1_id); - assert_eq!(pipeline.publication_id, publication1_id); assert_eq!(pipeline.config, config); } else if pipeline.id == pipeline2_id { let config = updated_pipeline_config(); assert_eq!(pipeline.tenant_id, tenant_id); assert_eq!(pipeline.source_id, source2_id); assert_eq!(pipeline.sink_id, sink2_id); - assert_eq!(pipeline.publication_id, publication2_id); assert_eq!(pipeline.config, config); } } diff --git a/api/tests/api/publications.rs b/api/tests/api/publications.rs deleted file mode 100644 index 9fc4dd9..0000000 --- a/api/tests/api/publications.rs +++ /dev/null @@ -1,332 +0,0 @@ -use api::db::publications::PublicationConfig; -use reqwest::StatusCode; - -use crate::{ - sinks::create_sink, - sources::create_source, - tenants::create_tenant, - test_app::{ - spawn_app, CreatePublicationRequest, CreatePublicationResponse, PublicationResponse, - TestApp, UpdatePublicationRequest, - }, -}; - -fn new_publication_config() -> PublicationConfig { - PublicationConfig { - table_names: vec!["table1".to_string()], - } -} - -fn updated_publication_config() -> PublicationConfig { - PublicationConfig { - table_names: vec!["table1".to_string(), "table2".to_string()], - } -} - -pub async fn create_publication(app: &TestApp, tenant_id: i64, source_id: i64) -> i64 { - create_publication_with_config( - app, - tenant_id, - source_id, - "new_publication".to_string(), - new_publication_config(), - ) - .await -} - -pub async fn create_publication_with_config( - app: &TestApp, - tenant_id: i64, - source_id: i64, - name: String, - config: PublicationConfig, -) -> i64 { - let publication = CreatePublicationRequest { - source_id, - name, - config, - }; - let response = app.create_publication(tenant_id, &publication).await; - let response: CreatePublicationResponse = response - .json() - .await - .expect("failed to deserialize response"); - response.id -} - -#[tokio::test] -async fn publication_can_be_created() { - // Arrange - let app = spawn_app().await; - let tenant_id = create_tenant(&app).await; - let source_id = create_source(&app, tenant_id).await; - - // Act - let publication = CreatePublicationRequest { - source_id, - name: "new_publication".to_string(), - config: new_publication_config(), - }; - let response = app.create_publication(tenant_id, &publication).await; - - // Assert - assert!(response.status().is_success()); - let response: CreatePublicationResponse = response - .json() - .await - .expect("failed to deserialize response"); - assert_eq!(response.id, 1); -} - -#[tokio::test] -async fn publication_with_another_tenants_source_cant_be_created() { - // Arrange - let app = spawn_app().await; - let tenant1_id = create_tenant(&app).await; - let source1_id = create_source(&app, tenant1_id).await; - let tenant2_id = create_tenant(&app).await; - - // Act - let publication = CreatePublicationRequest { - source_id: source1_id, - name: "new_publication".to_string(), - config: new_publication_config(), - }; - let response = app.create_publication(tenant2_id, &publication).await; - - // Assert - assert_eq!(response.status(), StatusCode::BAD_REQUEST); -} - -#[tokio::test] -async fn an_existing_publication_can_be_read() { - // Arrange - let app = spawn_app().await; - let tenant_id = create_tenant(&app).await; - let source_id = create_source(&app, tenant_id).await; - let sink_id = create_sink(&app, tenant_id).await; - let publication = CreatePublicationRequest { - source_id, - name: "new_publication".to_string(), - config: new_publication_config(), - }; - let response = app.create_publication(tenant_id, &publication).await; - let response: CreatePublicationResponse = response - .json() - .await - .expect("failed to deserialize response"); - let publication_id = response.id; - - // Act - let response = app.read_publication(tenant_id, publication_id).await; - - // Assert - assert!(response.status().is_success()); - let response: PublicationResponse = response - .json() - .await - .expect("failed to deserialize response"); - assert_eq!(response.id, sink_id); - assert_eq!(response.tenant_id, tenant_id); - assert_eq!(response.source_id, source_id); - assert_eq!(response.name, "new_publication".to_string()); - assert_eq!(response.config, publication.config); -} - -#[tokio::test] -async fn an_non_existing_publication_cant_be_read() { - // Arrange - let app = spawn_app().await; - let tenant_id = create_tenant(&app).await; - - // Act - let response = app.read_publication(tenant_id, 42).await; - - // Assert - assert_eq!(response.status(), StatusCode::NOT_FOUND); -} - -#[tokio::test] -async fn an_existing_publication_can_be_updated() { - // Arrange - let app = spawn_app().await; - let tenant_id = create_tenant(&app).await; - let source_id = create_source(&app, tenant_id).await; - - let publication = CreatePublicationRequest { - source_id, - name: "new_publication".to_string(), - config: new_publication_config(), - }; - let response = app.create_publication(tenant_id, &publication).await; - let response: CreatePublicationResponse = response - .json() - .await - .expect("failed to deserialize response"); - let publication_id = response.id; - - // Act - let updated_source_id = create_source(&app, tenant_id).await; - let updated_config = UpdatePublicationRequest { - source_id: updated_source_id, - name: "updated_publication".to_string(), - config: updated_publication_config(), - }; - let response = app - .update_publication(tenant_id, publication_id, &updated_config) - .await; - - // Assert - assert!(response.status().is_success()); - let response = app.read_publication(tenant_id, publication_id).await; - let response: PublicationResponse = response - .json() - .await - .expect("failed to deserialize response"); - assert_eq!(response.id, publication_id); - assert_eq!(response.tenant_id, tenant_id); - assert_eq!(response.source_id, updated_source_id); - assert_eq!(response.name, "updated_publication".to_string()); - assert_eq!(response.config, updated_config.config); -} - -#[tokio::test] -async fn publication_with_another_tenants_source_cant_be_updated() { - // Arrange - let app = spawn_app().await; - let tenant1_id = create_tenant(&app).await; - let source1_id = create_source(&app, tenant1_id).await; - let tenant2_id = create_tenant(&app).await; - let source2_id = create_source(&app, tenant2_id).await; - - // Act - let publication = CreatePublicationRequest { - source_id: source1_id, - name: "new_publication".to_string(), - config: new_publication_config(), - }; - let response = app.create_publication(tenant1_id, &publication).await; - let response: CreatePublicationResponse = response - .json() - .await - .expect("failed to deserialize response"); - let publication_id = response.id; - let updated_config = UpdatePublicationRequest { - source_id: source2_id, - name: "updated_publication".to_string(), - config: updated_publication_config(), - }; - let response = app - .update_publication(tenant1_id, publication_id, &updated_config) - .await; - - // Assert - assert_eq!(response.status(), StatusCode::BAD_REQUEST); -} - -#[tokio::test] -async fn an_non_existing_publication_cant_be_updated() { - // Arrange - let app = spawn_app().await; - let tenant_id = create_tenant(&app).await; - let source_id = create_source(&app, tenant_id).await; - - // Act - let updated_config = UpdatePublicationRequest { - source_id, - name: "updated_publication".to_string(), - config: updated_publication_config(), - }; - let response = app.update_publication(tenant_id, 42, &updated_config).await; - - // Assert - assert_eq!(response.status(), StatusCode::NOT_FOUND); -} - -#[tokio::test] -async fn an_existing_publication_can_be_deleted() { - // Arrange - let app = spawn_app().await; - let tenant_id = create_tenant(&app).await; - let source_id = create_source(&app, tenant_id).await; - - let publication = CreatePublicationRequest { - source_id, - name: "new_publication".to_string(), - config: new_publication_config(), - }; - let response = app.create_publication(tenant_id, &publication).await; - let response: CreatePublicationResponse = response - .json() - .await - .expect("failed to deserialize response"); - let publication_id = response.id; - - // Act - let response = app.delete_publication(tenant_id, publication_id).await; - - // Assert - assert!(response.status().is_success()); - let response = app.read_publication(tenant_id, publication_id).await; - assert_eq!(response.status(), StatusCode::NOT_FOUND); -} - -#[tokio::test] -async fn an_non_existing_publication_cant_be_deleted() { - // Arrange - let app = spawn_app().await; - let tenant_id = create_tenant(&app).await; - - // Act - let response = app.delete_publication(tenant_id, 42).await; - - // Assert - assert_eq!(response.status(), StatusCode::NOT_FOUND); -} - -#[tokio::test] -async fn all_publications_can_be_read() { - // Arrange - let app = spawn_app().await; - let tenant_id = create_tenant(&app).await; - let source_id = create_source(&app, tenant_id).await; - let publication1_id = create_publication_with_config( - &app, - tenant_id, - source_id, - "new_publication".to_string(), - new_publication_config(), - ) - .await; - let publication2_id = create_publication_with_config( - &app, - tenant_id, - source_id, - "updated_publication".to_string(), - updated_publication_config(), - ) - .await; - - // Act - let response = app.read_all_publications(tenant_id).await; - - // Assert - assert!(response.status().is_success()); - let response: Vec = response - .json() - .await - .expect("failed to deserialize response"); - for publication in response { - if publication.id == publication1_id { - let config = new_publication_config(); - assert_eq!(publication.tenant_id, tenant_id); - assert_eq!(publication.name, "new_publication".to_string()); - assert_eq!(publication.config, config); - } else if publication.id == publication2_id { - let config = updated_publication_config(); - assert_eq!(publication.tenant_id, tenant_id); - assert_eq!(publication.name, "updated_publication".to_string()); - assert_eq!(publication.config, config); - } - } -} diff --git a/api/tests/api/test_app.rs b/api/tests/api/test_app.rs index ebb1613..63a9706 100644 --- a/api/tests/api/test_app.rs +++ b/api/tests/api/test_app.rs @@ -2,10 +2,7 @@ use std::net::TcpListener; use api::{ configuration::get_configuration, - db::{ - pipelines::PipelineConfig, publications::PublicationConfig, sinks::SinkConfig, - sources::SourceConfig, - }, + db::{pipelines::PipelineConfig, sinks::SinkConfig, sources::SourceConfig}, startup::{get_connection_pool, run}, }; use serde::{Deserialize, Serialize}; @@ -87,7 +84,7 @@ pub struct SinkResponse { pub struct CreatePipelineRequest { pub source_id: i64, pub sink_id: i64, - pub publication_id: i64, + pub publication_name: String, pub config: PipelineConfig, } @@ -102,7 +99,7 @@ pub struct PipelineResponse { pub tenant_id: i64, pub source_id: i64, pub sink_id: i64, - pub publication_id: i64, + pub publication_name: String, pub config: PipelineConfig, } @@ -110,38 +107,10 @@ pub struct PipelineResponse { pub struct UpdatePipelineRequest { pub source_id: i64, pub sink_id: i64, - pub publication_id: i64, + pub publication_name: String, pub config: PipelineConfig, } -#[derive(Serialize)] -pub struct CreatePublicationRequest { - pub source_id: i64, - pub name: String, - pub config: PublicationConfig, -} - -#[derive(Deserialize)] -pub struct CreatePublicationResponse { - pub id: i64, -} - -#[derive(Serialize)] -pub struct UpdatePublicationRequest { - pub source_id: i64, - pub name: String, - pub config: PublicationConfig, -} - -#[derive(Deserialize)] -pub struct PublicationResponse { - pub id: i64, - pub tenant_id: i64, - pub source_id: i64, - pub name: String, - pub config: PublicationConfig, -} - impl TestApp { pub async fn create_tenant(&self, tenant: &CreateTenantRequest) -> reqwest::Response { self.api_client @@ -352,75 +321,6 @@ impl TestApp { .await .expect("failed to execute request") } - - pub async fn create_publication( - &self, - tenant_id: i64, - publication: &CreatePublicationRequest, - ) -> reqwest::Response { - self.api_client - .post(&format!("{}/v1/publications", &self.address)) - .header("tenant_id", tenant_id) - .json(publication) - .send() - .await - .expect("Failed to execute request.") - } - - pub async fn read_publication(&self, tenant_id: i64, publication_id: i64) -> reqwest::Response { - self.api_client - .get(&format!( - "{}/v1/publications/{publication_id}", - &self.address - )) - .header("tenant_id", tenant_id) - .send() - .await - .expect("failed to execute request") - } - - pub async fn update_publication( - &self, - tenant_id: i64, - publication_id: i64, - publication: &UpdatePublicationRequest, - ) -> reqwest::Response { - self.api_client - .post(&format!( - "{}/v1/publications/{publication_id}", - &self.address - )) - .header("tenant_id", tenant_id) - .json(publication) - .send() - .await - .expect("failed to execute request") - } - - pub async fn delete_publication( - &self, - tenant_id: i64, - publication_id: i64, - ) -> reqwest::Response { - self.api_client - .delete(&format!( - "{}/v1/publications/{publication_id}", - &self.address - )) - .header("tenant_id", tenant_id) - .send() - .await - .expect("Failed to execute request.") - } - - pub async fn read_all_publications(&self, tenant_id: i64) -> reqwest::Response { - self.api_client - .get(&format!("{}/v1/publications", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await - .expect("failed to execute request") - } } pub async fn spawn_app() -> TestApp { diff --git a/cli/src/api_client.rs b/cli/src/api_client.rs index fd06c7b..a1f5a13 100644 --- a/cli/src/api_client.rs +++ b/cli/src/api_client.rs @@ -227,7 +227,7 @@ impl Display for SinkResponse { pub struct CreatePipelineRequest { pub source_id: i64, pub sink_id: i64, - pub publication_id: i64, + pub publication_name: String, pub config: PipelineConfig, } @@ -248,7 +248,7 @@ pub struct PipelineResponse { pub tenant_id: i64, pub source_id: i64, pub sink_id: i64, - pub publication_id: i64, + pub publication_name: String, pub config: PipelineConfig, } @@ -256,8 +256,13 @@ impl Display for PipelineResponse { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "tenant_id: {}, id: {}, source_id: {}, sink_id: {}, publication_id: {}, config: {}", - self.tenant_id, self.id, self.source_id, self.sink_id, self.publication_id, self.config + "tenant_id: {}, id: {}, source_id: {}, sink_id: {}, publication_name: {}, config: {}", + self.tenant_id, + self.id, + self.source_id, + self.sink_id, + self.publication_name, + self.config ) } } @@ -266,7 +271,7 @@ impl Display for PipelineResponse { pub struct UpdatePipelineRequest { pub source_id: i64, pub sink_id: i64, - pub publication_id: i64, + pub publication_name: String, pub config: PipelineConfig, } @@ -719,115 +724,4 @@ impl ApiClient { Err(ApiClientError::ApiError(message.error)) } } - - pub async fn create_publication( - &self, - tenant_id: i64, - publication: &CreatePublicationRequest, - ) -> Result { - let response = self - .client - .post(&format!("{}/v1/publications", &self.address)) - .header("tenant_id", tenant_id) - .json(publication) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_publication( - &self, - tenant_id: i64, - publication_id: i64, - ) -> Result { - let response = self - .client - .get(&format!( - "{}/v1/publications/{publication_id}", - &self.address - )) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn update_publication( - &self, - tenant_id: i64, - publication_id: i64, - publication: &UpdatePublicationRequest, - ) -> Result<(), ApiClientError> { - let response = self - .client - .post(&format!( - "{}/v1/publications/{publication_id}", - &self.address - )) - .header("tenant_id", tenant_id) - .json(publication) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn delete_publication( - &self, - tenant_id: i64, - publication_id: i64, - ) -> Result<(), ApiClientError> { - let response = self - .client - .delete(&format!( - "{}/v1/publications/{publication_id}", - &self.address - )) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_all_publications( - &self, - tenant_id: i64, - ) -> Result, ApiClientError> { - let response = self - .client - .get(&format!("{}/v1/publications", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } } diff --git a/cli/src/commands.rs b/cli/src/commands.rs index 2b35208..7067364 100644 --- a/cli/src/commands.rs +++ b/cli/src/commands.rs @@ -4,10 +4,6 @@ use thiserror::Error; use crate::{ api_client::ApiClient, pipelines::{create_pipeline, delete_pipeline, list_pipelines, show_pipeline, update_pipeline}, - publications::{ - create_publication, delete_publication, list_publications, show_publication, - update_publication, - }, sinks::{create_sink, delete_sink, list_sinks, show_sink, update_sink}, sources::{create_source, delete_source, list_sources, show_source, update_source}, tenants::{create_tenant, delete_tenant, list_tenants, show_tenant, update_tenant}, @@ -53,7 +49,6 @@ pub enum SubCommand { Sources, Sinks, Pipelines, - Publications, } impl TryFrom<&str> for SubCommand { @@ -67,8 +62,6 @@ impl TryFrom<&str> for SubCommand { "pi" | "pip" | "pipe" | "pipel" | "pipeli" | "pipelin" | "pipeline" | "pipelines" => { SubCommand::Pipelines } - "pu" | "pub" | "publ" | "public" | "publica" | "publicat" | "publicati" - | "publicatio" | "publication" | "publications" => SubCommand::Publications, subcommand => { return Err(SubCommandParseError::InvalidSubCommand( subcommand.to_string(), @@ -117,16 +110,6 @@ pub async fn execute_commands( println!("error creating pipeline: {e:?}"); } }, - (Command::Add, SubCommand::Publications) => { - match create_publication(api_client, editor).await { - Ok(publication) => { - println!("publication created: {publication}"); - } - Err(e) => { - println!("error creating publication: {e:?}"); - } - } - } (Command::Update, SubCommand::Tenants) => match update_tenant(api_client, editor).await { Ok(()) => println!("tenant updated"), Err(e) => { @@ -152,14 +135,6 @@ pub async fn execute_commands( println!("error updating pipeline: {e:?}"); } }, - (Command::Update, SubCommand::Publications) => { - match update_publication(api_client, editor).await { - Ok(()) => println!("publication updated"), - Err(e) => { - println!("error updating publication: {e:?}"); - } - } - } (Command::Delete, SubCommand::Tenants) => match delete_tenant(api_client, editor).await { Ok(()) => println!("tenant deleted"), Err(e) => { @@ -185,14 +160,6 @@ pub async fn execute_commands( println!("error deleting pipeline: {e:?}"); } }, - (Command::Delete, SubCommand::Publications) => { - match delete_publication(api_client, editor).await { - Ok(()) => println!("publication deleted"), - Err(e) => { - println!("error deleting publication: {e:?}"); - } - } - } (Command::Show, SubCommand::Tenants) => match show_tenant(api_client, editor).await { Ok(tenant) => { println!("tenant: {tenant}") @@ -225,16 +192,6 @@ pub async fn execute_commands( println!("error reading pipeline: {e:?}"); } }, - (Command::Show, SubCommand::Publications) => { - match show_publication(api_client, editor).await { - Ok(publication) => { - println!("publication: {publication}") - } - Err(e) => { - println!("error reading publication: {e:?}"); - } - } - } (Command::List, SubCommand::Tenants) => match list_tenants(api_client).await { Ok(tenants) => { println!("tenants: "); @@ -279,18 +236,5 @@ pub async fn execute_commands( println!("error reading pipelines: {e:?}"); } }, - (Command::List, SubCommand::Publications) => { - match list_publications(api_client, editor).await { - Ok(publications) => { - println!("publications: "); - for publication in publications { - println!(" {publication}") - } - } - Err(e) => { - println!("error reading publications: {e:?}"); - } - } - } } } diff --git a/cli/src/pipelines.rs b/cli/src/pipelines.rs index 3dac706..570d3f9 100644 --- a/cli/src/pipelines.rs +++ b/cli/src/pipelines.rs @@ -6,7 +6,7 @@ use crate::{ PipelineResponse, UpdatePipelineRequest, }, get_id, get_u64, get_usize, - publications::get_publication_id, + publications::get_publication_name, sinks::get_sink_id, sources::get_source_id, tenants::get_tenant_id, @@ -20,7 +20,7 @@ pub async fn create_pipeline( let tenant_id = get_tenant_id(editor)?; let source_id = get_source_id(editor)?; let sink_id = get_sink_id(editor)?; - let publication_id = get_publication_id(editor)?; + let publication_name = get_publication_name(editor)?; let config = get_pipeline_config(editor)?; let pipeline = api_client .create_pipeline( @@ -28,7 +28,7 @@ pub async fn create_pipeline( &CreatePipelineRequest { source_id, sink_id, - publication_id, + publication_name, config, }, ) @@ -56,14 +56,14 @@ pub async fn update_pipeline( let tenant_id = get_tenant_id(editor)?; let source_id = get_source_id(editor)?; let sink_id = get_sink_id(editor)?; - let publication_id = get_publication_id(editor)?; + let publication_name = get_publication_name(editor)?; let pipeline_id = get_pipeline_id(editor)?; let config = get_pipeline_config(editor)?; let pipeline = UpdatePipelineRequest { source_id, sink_id, - publication_id, + publication_name, config, }; api_client diff --git a/cli/src/publications.rs b/cli/src/publications.rs index 0fe56ed..d7eac94 100644 --- a/cli/src/publications.rs +++ b/cli/src/publications.rs @@ -1,108 +1,7 @@ use rustyline::DefaultEditor; -use crate::{ - api_client::{ - ApiClient, CreatePublicationRequest, CreatePublicationResponse, PublicationConfig, - PublicationResponse, UpdatePublicationRequest, - }, - get_id, get_string, - sources::get_source_id, - tenants::get_tenant_id, - CliError, -}; +use crate::{get_string, CliError}; -pub async fn create_publication( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - let source_id = get_source_id(editor)?; - let config = get_publication_config(editor)?; - let name = get_string(editor, "enter publication name: ")?; - - let publication = api_client - .create_publication( - tenant_id, - &CreatePublicationRequest { - source_id, - name, - config, - }, - ) - .await?; - - Ok(publication) -} - -pub async fn show_publication( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - let publication_id = get_publication_id(editor)?; - - let publication = api_client - .read_publication(tenant_id, publication_id) - .await?; - - Ok(publication) -} - -pub async fn update_publication( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let source_id = get_source_id(editor)?; - let publication_id = get_publication_id(editor)?; - let config = get_publication_config(editor)?; - let name = get_string(editor, "enter publication name: ")?; - - let publication = UpdatePublicationRequest { - source_id, - name, - config, - }; - api_client - .update_publication(tenant_id, publication_id, &publication) - .await?; - - Ok(()) -} - -pub async fn delete_publication( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let publication_id = get_publication_id(editor)?; - - api_client - .delete_publication(tenant_id, publication_id) - .await?; - - Ok(()) -} - -pub async fn list_publications( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result, CliError> { - let tenant_id = get_tenant_id(editor)?; - let tenants = api_client.read_all_publications(tenant_id).await?; - - Ok(tenants) -} - -fn get_publication_config(editor: &mut DefaultEditor) -> Result { - let table_names = get_string(editor, "enter comma separated table names: ")?; - let table_names: Vec = table_names - .split(',') - .map(|table_name| table_name.trim().to_string()) - .collect(); - Ok(PublicationConfig { table_names }) -} - -pub fn get_publication_id(editor: &mut DefaultEditor) -> Result { - get_id(editor, "enter publication id: ") +pub fn get_publication_name(editor: &mut DefaultEditor) -> Result { + get_string(editor, "enter publication anme: ") }