Skip to content

Commit

Permalink
add publication id to pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 29, 2024
1 parent 63767c8 commit a8126e9
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 28 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ create table
tenant_id bigint references public.tenants(id) not null,
source_id bigint references public.sources(id) not null,
sink_id bigint references public.sinks(id) not null,
publication_id bigint references public.publications(id) not null,
config jsonb not null
);
19 changes: 13 additions & 6 deletions api/src/db/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Pipeline {
pub tenant_id: i64,
pub source_id: i64,
pub sink_id: i64,
pub publication_id: i64,
pub config: serde_json::Value,
}

Expand All @@ -27,18 +28,20 @@ pub async fn create_pipeline(
tenant_id: i64,
source_id: i64,
sink_id: i64,
publication_id: i64,
config: &PipelineConfig,
) -> Result<i64, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let record = sqlx::query!(
r#"
insert into pipelines (tenant_id, source_id, sink_id, config)
values ($1, $2, $3, $4)
insert into pipelines (tenant_id, source_id, sink_id, publication_id, config)
values ($1, $2, $3, $4, $5)
returning id
"#,
tenant_id,
source_id,
sink_id,
publication_id,
config
)
.fetch_one(pool)
Expand All @@ -54,7 +57,7 @@ pub async fn read_pipeline(
) -> Result<Option<Pipeline>, sqlx::Error> {
let record = sqlx::query!(
r#"
select id, tenant_id, source_id, sink_id, config
select id, tenant_id, source_id, sink_id, publication_id, config
from pipelines
where tenant_id = $1 and id = $2
"#,
Expand All @@ -69,6 +72,7 @@ pub async fn read_pipeline(
tenant_id: r.tenant_id,
source_id: r.source_id,
sink_id: r.sink_id,
publication_id: r.publication_id,
config: r.config,
}))
}
Expand All @@ -79,18 +83,20 @@ pub async fn update_pipeline(
pipeline_id: i64,
source_id: i64,
sink_id: i64,
publication_id: i64,
config: &PipelineConfig,
) -> Result<Option<i64>, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let record = sqlx::query!(
r#"
update pipelines
set source_id = $1, sink_id = $2, config = $3
where tenant_id = $4 and id = $5
set source_id = $1, sink_id = $2, publication_id = $3, config = $4
where tenant_id = $5 and id = $6
returning id
"#,
source_id,
sink_id,
publication_id,
config,
tenant_id,
pipeline_id
Expand Down Expand Up @@ -127,7 +133,7 @@ pub async fn read_all_pipelines(
) -> Result<Vec<Pipeline>, sqlx::Error> {
let mut record = sqlx::query!(
r#"
select id, tenant_id, source_id, sink_id, config
select id, tenant_id, source_id, sink_id, publication_id, config
from pipelines
where tenant_id = $1
"#,
Expand All @@ -143,6 +149,7 @@ pub async fn read_all_pipelines(
tenant_id: r.tenant_id,
source_id: r.source_id,
sink_id: r.sink_id,
publication_id: r.publication_id,
config: r.config,
})
.collect())
Expand Down
6 changes: 6 additions & 0 deletions api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl ResponseError for PipelineError {
struct PostPipelineRequest {
pub source_id: i64,
pub sink_id: i64,
pub publication_id: i64,
pub config: PipelineConfig,
}

Expand All @@ -61,6 +62,7 @@ struct GetPipelineResponse {
tenant_id: i64,
source_id: i64,
sink_id: i64,
publication_id: i64,
config: PipelineConfig,
}

Expand Down Expand Up @@ -93,6 +95,7 @@ pub async fn create_pipeline(
tenant_id,
pipeline.source_id,
pipeline.sink_id,
pipeline.publication_id,
&config,
)
.await?;
Expand All @@ -117,6 +120,7 @@ pub async fn read_pipeline(
tenant_id: s.tenant_id,
source_id: s.source_id,
sink_id: s.sink_id,
publication_id: s.publication_id,
config,
})
})
Expand All @@ -141,6 +145,7 @@ pub async fn update_pipeline(
pipeline_id,
pipeline.source_id,
pipeline.sink_id,
pipeline.publication_id,
config,
)
.await?
Expand Down Expand Up @@ -176,6 +181,7 @@ pub async fn read_all_pipelines(
tenant_id: pipeline.tenant_id,
source_id: pipeline.source_id,
sink_id: pipeline.sink_id,
publication_id: pipeline.publication_id,
config,
};
pipelines.push(sink);
Expand Down
Loading

0 comments on commit a8126e9

Please sign in to comment.