Skip to content

Commit

Permalink
associate a publication with a source
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 29, 2024
1 parent 3f57f4d commit 8f792ca
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 20 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/migrations/20240828090309_create_publication.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ create table
public.publications (
id bigint generated always as identity primary key,
tenant_id bigint references public.tenants(id) not null,
source_id bigint references public.sources(id) not null,
config jsonb not null
);
19 changes: 13 additions & 6 deletions api/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,25 @@ pub struct PublicationConfig {
pub struct Publication {
pub id: i64,
pub tenant_id: i64,
pub source_id: i64,
pub config: serde_json::Value,
}

pub async fn create_publication(
pool: &PgPool,
tenant_id: i64,
source_id: i64,
config: &PublicationConfig,
) -> Result<i64, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let record = sqlx::query!(
r#"
insert into publications (tenant_id, config)
values ($1, $2)
insert into publications (tenant_id, source_id, config)
values ($1, $2, $3)
returning id
"#,
tenant_id,
source_id,
config
)
.fetch_one(pool)
Expand All @@ -39,7 +42,7 @@ pub async fn read_publication(
) -> Result<Option<Publication>, sqlx::Error> {
let record = sqlx::query!(
r#"
select id, tenant_id, config
select id, tenant_id, source_id, config
from publications
where tenant_id = $1 and id = $2
"#,
Expand All @@ -52,6 +55,7 @@ pub async fn read_publication(
Ok(record.map(|r| Publication {
id: r.id,
tenant_id: r.tenant_id,
source_id: r.source_id,
config: r.config,
}))
}
Expand All @@ -60,16 +64,18 @@ pub async fn update_publication(
pool: &PgPool,
tenant_id: i64,
publication_id: i64,
source_id: i64,
config: &PublicationConfig,
) -> Result<Option<i64>, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let record = sqlx::query!(
r#"
update publications
set config = $1
where tenant_id = $2 and id = $3
set source_id = $1, config = $2
where tenant_id = $3 and id = $4
returning id
"#,
source_id,
config,
tenant_id,
publication_id
Expand Down Expand Up @@ -106,7 +112,7 @@ pub async fn read_all_publications(
) -> Result<Vec<Publication>, sqlx::Error> {
let mut record = sqlx::query!(
r#"
select id, tenant_id, config
select id, tenant_id, source_id, config
from publications
where tenant_id = $1
"#,
Expand All @@ -120,6 +126,7 @@ pub async fn read_all_publications(
.map(|r| Publication {
id: r.id,
tenant_id: r.tenant_id,
source_id: r.source_id,
config: r.config,
})
.collect())
Expand Down
10 changes: 8 additions & 2 deletions api/src/routes/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl ResponseError for PublicationError {

#[derive(Deserialize)]
struct PostPublicationRequest {
pub source_id: i64,
pub config: PublicationConfig,
}

Expand All @@ -57,6 +58,7 @@ struct PostPublicationResponse {
struct GetPublicationResponse {
id: i64,
tenant_id: i64,
source_id: i64,
config: PublicationConfig,
}

Expand All @@ -83,8 +85,9 @@ pub async fn create_publication(
) -> Result<impl Responder, PublicationError> {
let publication = publication.0;
let tenant_id = extract_tenant_id(&req)?;
let source_id = publication.source_id;
let config = publication.config;
let id = db::publications::create_publication(&pool, tenant_id, &config).await?;
let id = db::publications::create_publication(&pool, tenant_id, source_id, &config).await?;
let response = PostPublicationResponse { id };
Ok(Json(response))
}
Expand All @@ -104,6 +107,7 @@ pub async fn read_publication(
Ok::<GetPublicationResponse, serde_json::Error>(GetPublicationResponse {
id: s.id,
tenant_id: s.tenant_id,
source_id: s.source_id,
config,
})
})
Expand All @@ -121,8 +125,9 @@ pub async fn update_publication(
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let publication_id = publication_id.into_inner();
let source_id = publication.source_id;
let config = &publication.config;
db::publications::update_publication(&pool, tenant_id, publication_id, config)
db::publications::update_publication(&pool, tenant_id, publication_id, source_id, config)
.await?
.ok_or(PublicationError::NotFound(publication_id))?;
Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -154,6 +159,7 @@ pub async fn read_all_publications(
let sink = GetPublicationResponse {
id: publication.id,
tenant_id: publication.tenant_id,
source_id: publication.source_id,
config,
};
publications.push(sink);
Expand Down
24 changes: 21 additions & 3 deletions api/tests/api/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use reqwest::StatusCode;

use crate::{
sinks::create_sink,
sources::create_source,
tenants::create_tenant,
test_app::{
spawn_app, CreatePublicationRequest, CreatePublicationResponse, PublicationResponse,
Expand All @@ -25,9 +26,10 @@ fn updated_publication_config() -> PublicationConfig {
pub async fn create_publication_with_config(
app: &TestApp,
tenant_id: i64,
source_id: i64,
config: PublicationConfig,
) -> i64 {
let publication = CreatePublicationRequest { config };
let publication = CreatePublicationRequest { source_id, config };
let response = app.create_publication(tenant_id, &publication).await;
let response: CreatePublicationResponse = response
.json()
Expand All @@ -41,9 +43,11 @@ async fn publication_can_be_created() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, tenant_id).await;

// Act
let publication = CreatePublicationRequest {
source_id,
config: new_publication_config(),
};
let response = app.create_publication(tenant_id, &publication).await;
Expand All @@ -62,8 +66,10 @@ async fn an_existing_publication_can_be_read() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, tenant_id).await;
let sink_id = create_sink(&app, tenant_id).await;
let publication = CreatePublicationRequest {
source_id,
config: new_publication_config(),
};
let response = app.create_publication(tenant_id, &publication).await;
Expand All @@ -84,6 +90,7 @@ async fn an_existing_publication_can_be_read() {
.expect("failed to deserialize response");
assert_eq!(response.id, sink_id);
assert_eq!(response.tenant_id, tenant_id);
assert_eq!(response.source_id, source_id);
assert_eq!(response.config, publication.config);
}

Expand All @@ -105,8 +112,10 @@ async fn an_existing_publication_can_be_updated() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, tenant_id).await;

let publication = CreatePublicationRequest {
source_id,
config: new_publication_config(),
};
let response = app.create_publication(tenant_id, &publication).await;
Expand All @@ -117,7 +126,9 @@ async fn an_existing_publication_can_be_updated() {
let publication_id = response.id;

// Act
let updated_source_id = create_source(&app, tenant_id).await;
let updated_config = UpdatePublicationRequest {
source_id: updated_source_id,
config: updated_publication_config(),
};
let response = app
Expand All @@ -133,6 +144,7 @@ async fn an_existing_publication_can_be_updated() {
.expect("failed to deserialize response");
assert_eq!(response.id, publication_id);
assert_eq!(response.tenant_id, tenant_id);
assert_eq!(response.source_id, updated_source_id);
assert_eq!(response.config, updated_config.config);
}

Expand All @@ -141,9 +153,11 @@ async fn an_non_existing_publication_cant_be_updated() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, tenant_id).await;

// Act
let updated_config = UpdatePublicationRequest {
source_id,
config: updated_publication_config(),
};
let response = app.update_publication(tenant_id, 42, &updated_config).await;
Expand All @@ -157,8 +171,10 @@ async fn an_existing_publication_can_be_deleted() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, tenant_id).await;

let publication = CreatePublicationRequest {
source_id,
config: new_publication_config(),
};
let response = app.create_publication(tenant_id, &publication).await;
Expand Down Expand Up @@ -195,10 +211,12 @@ async fn all_publications_can_be_read() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, tenant_id).await;
let publication1_id =
create_publication_with_config(&app, tenant_id, new_publication_config()).await;
create_publication_with_config(&app, tenant_id, source_id, new_publication_config()).await;
let publication2_id =
create_publication_with_config(&app, tenant_id, updated_publication_config()).await;
create_publication_with_config(&app, tenant_id, source_id, updated_publication_config())
.await;

// Act
let response = app.read_all_publications(tenant_id).await;
Expand Down
Loading

0 comments on commit 8f792ca

Please sign in to comment.