Skip to content

Commit

Permalink
add api to start a pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 2, 2024
1 parent b52f809 commit 8118ec3
Show file tree
Hide file tree
Showing 15 changed files with 377 additions and 48 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions api/migrations/20240828090309_create_replicators.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
create type replicator_status as enum ('stopped', 'starting', 'started', 'stopping');

create table
public.replicators (
id bigint generated always as identity primary key,
tenant_id bigint references public.tenants(id) not null,
status replicator_status not null
);
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ create table
tenant_id bigint references public.tenants(id) not null,
source_id bigint references public.sources(id) not null,
sink_id bigint references public.sinks(id) not null,
replicator_id bigint references public.replicators(id) not null,
publication_name text not null,
config jsonb not null
);
1 change: 1 addition & 0 deletions api/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod pipelines;
pub mod publications;
pub mod replicators;
pub mod sinks;
pub mod sources;
pub mod tables;
Expand Down
19 changes: 14 additions & 5 deletions api/src/db/pipelines.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use sqlx::PgPool;

use super::replicators::create_replicator_txn;

#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct PipelineConfig {
pub config: BatchConfig,
Expand All @@ -19,6 +21,7 @@ pub struct Pipeline {
pub tenant_id: i64,
pub source_id: i64,
pub sink_id: i64,
pub replicator_id: i64,
pub publication_name: String,
pub config: serde_json::Value,
}
Expand All @@ -32,20 +35,24 @@ pub async fn create_pipeline(
config: &PipelineConfig,
) -> Result<i64, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let mut txn = pool.begin().await?;
let replicator_id = create_replicator_txn(&mut txn, tenant_id).await?;
let record = sqlx::query!(
r#"
insert into pipelines (tenant_id, source_id, sink_id, publication_name, config)
values ($1, $2, $3, $4, $5)
insert into pipelines (tenant_id, source_id, sink_id, replicator_id, publication_name, config)
values ($1, $2, $3, $4, $5, $6)
returning id
"#,
tenant_id,
source_id,
sink_id,
replicator_id,
publication_name,
config
)
.fetch_one(pool)
.fetch_one(&mut *txn)
.await?;
txn.commit().await?;

Ok(record.id)
}
Expand All @@ -57,7 +64,7 @@ pub async fn read_pipeline(
) -> Result<Option<Pipeline>, sqlx::Error> {
let record = sqlx::query!(
r#"
select id, tenant_id, source_id, sink_id, publication_name, config
select id, tenant_id, source_id, sink_id, replicator_id, publication_name, config
from pipelines
where tenant_id = $1 and id = $2
"#,
Expand All @@ -72,6 +79,7 @@ pub async fn read_pipeline(
tenant_id: r.tenant_id,
source_id: r.source_id,
sink_id: r.sink_id,
replicator_id: r.replicator_id,
publication_name: r.publication_name,
config: r.config,
}))
Expand Down Expand Up @@ -133,7 +141,7 @@ pub async fn read_all_pipelines(
) -> Result<Vec<Pipeline>, sqlx::Error> {
let mut record = sqlx::query!(
r#"
select id, tenant_id, source_id, sink_id, publication_name, config
select id, tenant_id, source_id, sink_id, replicator_id, publication_name, config
from pipelines
where tenant_id = $1
"#,
Expand All @@ -149,6 +157,7 @@ pub async fn read_all_pipelines(
tenant_id: r.tenant_id,
source_id: r.source_id,
sink_id: r.sink_id,
replicator_id: r.replicator_id,
publication_name: r.publication_name,
config: r.config,
})
Expand Down
67 changes: 67 additions & 0 deletions api/src/db/replicators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use sqlx::{PgPool, Postgres, Transaction};

#[derive(Clone, Debug, PartialEq, PartialOrd, sqlx::Type)]
#[sqlx(type_name = "replicator_status", rename_all = "lowercase")]
pub enum ReplicatorStatus {
Stopped,
Starting,
Started,
Stopping,
}

pub struct Replicator {
pub id: i64,
pub tenant_id: i64,
pub status: ReplicatorStatus,
}

pub async fn create_replicator(pool: &PgPool, tenant_id: i64) -> Result<i64, sqlx::Error> {
let mut txn = pool.begin().await?;
let res = create_replicator_txn(&mut txn, tenant_id).await;
txn.commit().await?;
res
}

pub async fn create_replicator_txn(
txn: &mut Transaction<'_, Postgres>,
tenant_id: i64,
) -> Result<i64, sqlx::Error> {
let record = sqlx::query!(
r#"
insert into replicators (tenant_id, status)
values ($1, $2::replicator_status)
returning id
"#,
tenant_id,
ReplicatorStatus::Stopped as ReplicatorStatus
)
.fetch_one(&mut **txn)
.await?;

Ok(record.id)
}

pub async fn read_replicator_by_pipeline_id(
pool: &PgPool,
tenant_id: i64,
pipeline_id: i64,
) -> Result<Option<Replicator>, sqlx::Error> {
let record = sqlx::query!(
r#"
select r.id, r.tenant_id, status as "status: ReplicatorStatus"
from replicators r
join pipelines p on r.id = p.replicator_id
where r.tenant_id = $1 and p.tenant_id = $1 and p.id = $2
"#,
tenant_id,
pipeline_id,
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| Replicator {
id: r.id,
tenant_id: r.tenant_id,
status: r.status,
}))
}
2 changes: 1 addition & 1 deletion api/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub async fn enqueue_task(
pool: &PgPool,
task_name: &str,
task_data: serde_json::Value,
) -> Result<i64, anyhow::Error> {
) -> Result<i64, sqlx::Error> {
let task = sqlx::query!(
r#"
insert into queue.task_queue (name, data)
Expand Down
Loading

0 comments on commit 8118ec3

Please sign in to comment.