diff --git a/api/.sqlx/query-698ea90979614d4db8e3df5c775876cc53e53d613311927f43fddeb855910da3.json b/api/.sqlx/query-2e45a8db40adcc2731acb62d1d2c237597fc90d2d3c273ac6e54484d7d8fd225.json similarity index 60% rename from api/.sqlx/query-698ea90979614d4db8e3df5c775876cc53e53d613311927f43fddeb855910da3.json rename to api/.sqlx/query-2e45a8db40adcc2731acb62d1d2c237597fc90d2d3c273ac6e54484d7d8fd225.json index 8aee0b3..7e29f02 100644 --- a/api/.sqlx/query-698ea90979614d4db8e3df5c775876cc53e53d613311927f43fddeb855910da3.json +++ b/api/.sqlx/query-2e45a8db40adcc2731acb62d1d2c237597fc90d2d3c273ac6e54484d7d8fd225.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select id, tenant_id, config\n from publications\n where tenant_id = $1 and id = $2\n ", + "query": "\n select id, tenant_id, source_id, config\n from publications\n where tenant_id = $1\n ", "describe": { "columns": [ { @@ -15,21 +15,26 @@ }, { "ordinal": 2, + "name": "source_id", + "type_info": "Int8" + }, + { + "ordinal": 3, "name": "config", "type_info": "Jsonb" } ], "parameters": { "Left": [ - "Int8", "Int8" ] }, "nullable": [ + false, false, false, false ] }, - "hash": "698ea90979614d4db8e3df5c775876cc53e53d613311927f43fddeb855910da3" + "hash": "2e45a8db40adcc2731acb62d1d2c237597fc90d2d3c273ac6e54484d7d8fd225" } diff --git a/api/.sqlx/query-e348942ce2017d9f3ce73acf65a6d7894875d89d8d43f7d8bf2225485d77c5a1.json b/api/.sqlx/query-41b4a42897a8c8870a3ec16237fb04357cd6213f93fbe8f0eff1588dabfea4da.json similarity index 58% rename from api/.sqlx/query-e348942ce2017d9f3ce73acf65a6d7894875d89d8d43f7d8bf2225485d77c5a1.json rename to api/.sqlx/query-41b4a42897a8c8870a3ec16237fb04357cd6213f93fbe8f0eff1588dabfea4da.json index 3b634b7..fbfd01c 100644 --- a/api/.sqlx/query-e348942ce2017d9f3ce73acf65a6d7894875d89d8d43f7d8bf2225485d77c5a1.json +++ b/api/.sqlx/query-41b4a42897a8c8870a3ec16237fb04357cd6213f93fbe8f0eff1588dabfea4da.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select id, tenant_id, config\n from publications\n where tenant_id = $1\n ", + "query": "\n select id, tenant_id, source_id, config\n from publications\n where tenant_id = $1 and id = $2\n ", "describe": { "columns": [ { @@ -15,20 +15,27 @@ }, { "ordinal": 2, + "name": "source_id", + "type_info": "Int8" + }, + { + "ordinal": 3, "name": "config", "type_info": "Jsonb" } ], "parameters": { "Left": [ + "Int8", "Int8" ] }, "nullable": [ + false, false, false, false ] }, - "hash": "e348942ce2017d9f3ce73acf65a6d7894875d89d8d43f7d8bf2225485d77c5a1" + "hash": "41b4a42897a8c8870a3ec16237fb04357cd6213f93fbe8f0eff1588dabfea4da" } diff --git a/api/.sqlx/query-b76dc3a04df7f21374ad4194d7dfddd1ffe2835db207588f0adc73c606456e7e.json b/api/.sqlx/query-512032b289c4570099e20b23b2785bf71a6cfac2e68b8e36ef9f3ebd1d57c27f.json similarity index 54% rename from api/.sqlx/query-b76dc3a04df7f21374ad4194d7dfddd1ffe2835db207588f0adc73c606456e7e.json rename to api/.sqlx/query-512032b289c4570099e20b23b2785bf71a6cfac2e68b8e36ef9f3ebd1d57c27f.json index 39a5862..ade4c0f 100644 --- a/api/.sqlx/query-b76dc3a04df7f21374ad4194d7dfddd1ffe2835db207588f0adc73c606456e7e.json +++ b/api/.sqlx/query-512032b289c4570099e20b23b2785bf71a6cfac2e68b8e36ef9f3ebd1d57c27f.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n insert into publications (tenant_id, config)\n values ($1, $2)\n returning id\n ", + "query": "\n insert into publications (tenant_id, source_id, config)\n values ($1, $2, $3)\n returning id\n ", "describe": { "columns": [ { @@ -11,6 +11,7 @@ ], "parameters": { "Left": [ + "Int8", "Int8", "Jsonb" ] @@ -19,5 +20,5 @@ false ] }, - "hash": "b76dc3a04df7f21374ad4194d7dfddd1ffe2835db207588f0adc73c606456e7e" + "hash": "512032b289c4570099e20b23b2785bf71a6cfac2e68b8e36ef9f3ebd1d57c27f" } diff --git a/api/.sqlx/query-8077fb955105dcaf7cd28905fb9d7993768cdf4e93c18e05a214d60e9f9af2bc.json b/api/.sqlx/query-8d3aecc97652cfaa725de04395734fb1ead572468ceaf1155864dbb62b2dd704.json similarity index 54% rename from api/.sqlx/query-8077fb955105dcaf7cd28905fb9d7993768cdf4e93c18e05a214d60e9f9af2bc.json rename to api/.sqlx/query-8d3aecc97652cfaa725de04395734fb1ead572468ceaf1155864dbb62b2dd704.json index 24e50f1..e1fd9d3 100644 --- a/api/.sqlx/query-8077fb955105dcaf7cd28905fb9d7993768cdf4e93c18e05a214d60e9f9af2bc.json +++ b/api/.sqlx/query-8d3aecc97652cfaa725de04395734fb1ead572468ceaf1155864dbb62b2dd704.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n update publications\n set config = $1\n where tenant_id = $2 and id = $3\n returning id\n ", + "query": "\n update publications\n set source_id = $1, config = $2\n where tenant_id = $3 and id = $4\n returning id\n ", "describe": { "columns": [ { @@ -11,6 +11,7 @@ ], "parameters": { "Left": [ + "Int8", "Jsonb", "Int8", "Int8" @@ -20,5 +21,5 @@ false ] }, - "hash": "8077fb955105dcaf7cd28905fb9d7993768cdf4e93c18e05a214d60e9f9af2bc" + "hash": "8d3aecc97652cfaa725de04395734fb1ead572468ceaf1155864dbb62b2dd704" } diff --git a/api/migrations/20240828090309_create_publication.sql b/api/migrations/20240828090309_create_publication.sql index cc91aa6..7e5be21 100644 --- a/api/migrations/20240828090309_create_publication.sql +++ b/api/migrations/20240828090309_create_publication.sql @@ -2,5 +2,6 @@ create table public.publications ( id bigint generated always as identity primary key, tenant_id bigint references public.tenants(id) not null, + source_id bigint references public.sources(id) not null, config jsonb not null ); diff --git a/api/src/db/publications.rs b/api/src/db/publications.rs index 74b9f81..7510dca 100644 --- a/api/src/db/publications.rs +++ b/api/src/db/publications.rs @@ -8,22 +8,25 @@ pub struct PublicationConfig { pub struct Publication { pub id: i64, pub tenant_id: i64, + pub source_id: i64, pub config: serde_json::Value, } pub async fn create_publication( pool: &PgPool, tenant_id: i64, + source_id: i64, config: &PublicationConfig, ) -> Result { let config = serde_json::to_value(config).expect("failed to serialize config"); let record = sqlx::query!( r#" - insert into publications (tenant_id, config) - values ($1, $2) + insert into publications (tenant_id, source_id, config) + values ($1, $2, $3) returning id "#, tenant_id, + source_id, config ) .fetch_one(pool) @@ -39,7 +42,7 @@ pub async fn read_publication( ) -> Result, sqlx::Error> { let record = sqlx::query!( r#" - select id, tenant_id, config + select id, tenant_id, source_id, config from publications where tenant_id = $1 and id = $2 "#, @@ -52,6 +55,7 @@ pub async fn read_publication( Ok(record.map(|r| Publication { id: r.id, tenant_id: r.tenant_id, + source_id: r.source_id, config: r.config, })) } @@ -60,16 +64,18 @@ pub async fn update_publication( pool: &PgPool, tenant_id: i64, publication_id: i64, + source_id: i64, config: &PublicationConfig, ) -> Result, sqlx::Error> { let config = serde_json::to_value(config).expect("failed to serialize config"); let record = sqlx::query!( r#" update publications - set config = $1 - where tenant_id = $2 and id = $3 + set source_id = $1, config = $2 + where tenant_id = $3 and id = $4 returning id "#, + source_id, config, tenant_id, publication_id @@ -106,7 +112,7 @@ pub async fn read_all_publications( ) -> Result, sqlx::Error> { let mut record = sqlx::query!( r#" - select id, tenant_id, config + select id, tenant_id, source_id, config from publications where tenant_id = $1 "#, @@ -120,6 +126,7 @@ pub async fn read_all_publications( .map(|r| Publication { id: r.id, tenant_id: r.tenant_id, + source_id: r.source_id, config: r.config, }) .collect()) diff --git a/api/src/routes/publications.rs b/api/src/routes/publications.rs index fd470b8..0dbf756 100644 --- a/api/src/routes/publications.rs +++ b/api/src/routes/publications.rs @@ -45,6 +45,7 @@ impl ResponseError for PublicationError { #[derive(Deserialize)] struct PostPublicationRequest { + pub source_id: i64, pub config: PublicationConfig, } @@ -57,6 +58,7 @@ struct PostPublicationResponse { struct GetPublicationResponse { id: i64, tenant_id: i64, + source_id: i64, config: PublicationConfig, } @@ -83,8 +85,9 @@ pub async fn create_publication( ) -> Result { let publication = publication.0; let tenant_id = extract_tenant_id(&req)?; + let source_id = publication.source_id; let config = publication.config; - let id = db::publications::create_publication(&pool, tenant_id, &config).await?; + let id = db::publications::create_publication(&pool, tenant_id, source_id, &config).await?; let response = PostPublicationResponse { id }; Ok(Json(response)) } @@ -104,6 +107,7 @@ pub async fn read_publication( Ok::(GetPublicationResponse { id: s.id, tenant_id: s.tenant_id, + source_id: s.source_id, config, }) }) @@ -121,8 +125,9 @@ pub async fn update_publication( ) -> Result { let tenant_id = extract_tenant_id(&req)?; let publication_id = publication_id.into_inner(); + let source_id = publication.source_id; let config = &publication.config; - db::publications::update_publication(&pool, tenant_id, publication_id, config) + db::publications::update_publication(&pool, tenant_id, publication_id, source_id, config) .await? .ok_or(PublicationError::NotFound(publication_id))?; Ok(HttpResponse::Ok().finish()) @@ -154,6 +159,7 @@ pub async fn read_all_publications( let sink = GetPublicationResponse { id: publication.id, tenant_id: publication.tenant_id, + source_id: publication.source_id, config, }; publications.push(sink); diff --git a/api/tests/api/publications.rs b/api/tests/api/publications.rs index d9caccb..da24888 100644 --- a/api/tests/api/publications.rs +++ b/api/tests/api/publications.rs @@ -3,6 +3,7 @@ use reqwest::StatusCode; use crate::{ sinks::create_sink, + sources::create_source, tenants::create_tenant, test_app::{ spawn_app, CreatePublicationRequest, CreatePublicationResponse, PublicationResponse, @@ -25,9 +26,10 @@ fn updated_publication_config() -> PublicationConfig { pub async fn create_publication_with_config( app: &TestApp, tenant_id: i64, + source_id: i64, config: PublicationConfig, ) -> i64 { - let publication = CreatePublicationRequest { config }; + let publication = CreatePublicationRequest { source_id, config }; let response = app.create_publication(tenant_id, &publication).await; let response: CreatePublicationResponse = response .json() @@ -41,9 +43,11 @@ async fn publication_can_be_created() { // Arrange let app = spawn_app().await; let tenant_id = create_tenant(&app).await; + let source_id = create_source(&app, tenant_id).await; // Act let publication = CreatePublicationRequest { + source_id, config: new_publication_config(), }; let response = app.create_publication(tenant_id, &publication).await; @@ -62,8 +66,10 @@ async fn an_existing_publication_can_be_read() { // Arrange let app = spawn_app().await; let tenant_id = create_tenant(&app).await; + let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; let publication = CreatePublicationRequest { + source_id, config: new_publication_config(), }; let response = app.create_publication(tenant_id, &publication).await; @@ -84,6 +90,7 @@ async fn an_existing_publication_can_be_read() { .expect("failed to deserialize response"); assert_eq!(response.id, sink_id); assert_eq!(response.tenant_id, tenant_id); + assert_eq!(response.source_id, source_id); assert_eq!(response.config, publication.config); } @@ -105,8 +112,10 @@ async fn an_existing_publication_can_be_updated() { // Arrange let app = spawn_app().await; let tenant_id = create_tenant(&app).await; + let source_id = create_source(&app, tenant_id).await; let publication = CreatePublicationRequest { + source_id, config: new_publication_config(), }; let response = app.create_publication(tenant_id, &publication).await; @@ -117,7 +126,9 @@ async fn an_existing_publication_can_be_updated() { let publication_id = response.id; // Act + let updated_source_id = create_source(&app, tenant_id).await; let updated_config = UpdatePublicationRequest { + source_id: updated_source_id, config: updated_publication_config(), }; let response = app @@ -133,6 +144,7 @@ async fn an_existing_publication_can_be_updated() { .expect("failed to deserialize response"); assert_eq!(response.id, publication_id); assert_eq!(response.tenant_id, tenant_id); + assert_eq!(response.source_id, updated_source_id); assert_eq!(response.config, updated_config.config); } @@ -141,9 +153,11 @@ async fn an_non_existing_publication_cant_be_updated() { // Arrange let app = spawn_app().await; let tenant_id = create_tenant(&app).await; + let source_id = create_source(&app, tenant_id).await; // Act let updated_config = UpdatePublicationRequest { + source_id, config: updated_publication_config(), }; let response = app.update_publication(tenant_id, 42, &updated_config).await; @@ -157,8 +171,10 @@ async fn an_existing_publication_can_be_deleted() { // Arrange let app = spawn_app().await; let tenant_id = create_tenant(&app).await; + let source_id = create_source(&app, tenant_id).await; let publication = CreatePublicationRequest { + source_id, config: new_publication_config(), }; let response = app.create_publication(tenant_id, &publication).await; @@ -195,10 +211,12 @@ async fn all_publications_can_be_read() { // Arrange let app = spawn_app().await; let tenant_id = create_tenant(&app).await; + let source_id = create_source(&app, tenant_id).await; let publication1_id = - create_publication_with_config(&app, tenant_id, new_publication_config()).await; + create_publication_with_config(&app, tenant_id, source_id, new_publication_config()).await; let publication2_id = - create_publication_with_config(&app, tenant_id, updated_publication_config()).await; + create_publication_with_config(&app, tenant_id, source_id, updated_publication_config()) + .await; // Act let response = app.read_all_publications(tenant_id).await; diff --git a/api/tests/api/test_app.rs b/api/tests/api/test_app.rs index 7e24fe3..bffe9db 100644 --- a/api/tests/api/test_app.rs +++ b/api/tests/api/test_app.rs @@ -116,6 +116,7 @@ pub struct UpdatePipelineRequest { #[derive(Serialize)] pub struct CreatePublicationRequest { + pub source_id: i64, pub config: PublicationConfig, } @@ -126,6 +127,7 @@ pub struct CreatePublicationResponse { #[derive(Serialize)] pub struct UpdatePublicationRequest { + pub source_id: i64, pub config: PublicationConfig, } @@ -133,6 +135,7 @@ pub struct UpdatePublicationRequest { pub struct PublicationResponse { pub id: i64, pub tenant_id: i64, + pub source_id: i64, pub config: PublicationConfig, }