Skip to content

Commit

Permalink
allow configuring of replicator images
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 3, 2024
1 parent bca42f8 commit c4a09ba
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 13 deletions.
6 changes: 6 additions & 0 deletions api/migrations/20240828090309_create_images.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
create table
public.images (
id bigint generated always as identity primary key,
image_name text not null,
is_default boolean not null
);
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ create table
public.replicators (
id bigint generated always as identity primary key,
tenant_id bigint references public.tenants(id) not null,
image_id bigint references public.images(id) not null,
status replicator_status not null
);
79 changes: 79 additions & 0 deletions api/src/db/images.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use sqlx::{PgPool, Postgres, Transaction};

pub struct Image {
pub id: i64,
pub image_name: String,
pub is_default: bool,
}

pub async fn create_image(
pool: &PgPool,
image_name: &str,
is_default: bool,
) -> Result<i64, sqlx::Error> {
let mut txn = pool.begin().await?;
let res = create_image_txn(&mut txn, image_name, is_default).await;
txn.commit().await?;
res
}

pub async fn create_image_txn(
txn: &mut Transaction<'_, Postgres>,
image_name: &str,
is_default: bool,
) -> Result<i64, sqlx::Error> {
let record = sqlx::query!(
r#"
insert into images (image_name, is_default)
values ($1, $2)
returning id
"#,
image_name,
is_default
)
.fetch_one(&mut **txn)
.await?;

Ok(record.id)
}

pub async fn read_default_image(pool: &PgPool) -> Result<Option<Image>, sqlx::Error> {
let record = sqlx::query!(
r#"
select id, image_name, is_default
from images
where is_default = true
"#,
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| Image {
id: r.id,
image_name: r.image_name,
is_default: r.is_default,
}))
}

pub async fn read_image_by_replicator_id(
pool: &PgPool,
replicator_id: i64,
) -> Result<Option<Image>, sqlx::Error> {
let record = sqlx::query!(
r#"
select i.id, i.image_name, i.is_default
from images i
join replicators r on i.id = r.image_id
where r.id = $1
"#,
replicator_id,
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| Image {
id: r.id,
image_name: r.image_name,
is_default: r.is_default,
}))
}
1 change: 1 addition & 0 deletions api/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub mod sinks;
pub mod sources;
pub mod tables;
pub mod tenants;
pub mod images;
3 changes: 2 additions & 1 deletion api/src/db/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ pub async fn create_pipeline(
tenant_id: i64,
source_id: i64,
sink_id: i64,
image_id: i64,
publication_name: String,
config: &PipelineConfig,
) -> Result<i64, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let mut txn = pool.begin().await?;
let replicator_id = create_replicator_txn(&mut txn, tenant_id).await?;
let replicator_id = create_replicator_txn(&mut txn, tenant_id, image_id).await?;
let record = sqlx::query!(
r#"
insert into pipelines (tenant_id, source_id, sink_id, replicator_id, publication_name, config)
Expand Down
18 changes: 13 additions & 5 deletions api/src/db/replicators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,34 @@ pub enum ReplicatorStatus {
pub struct Replicator {
pub id: i64,
pub tenant_id: i64,
pub image_id: i64,
pub status: ReplicatorStatus,
}

pub async fn create_replicator(pool: &PgPool, tenant_id: i64) -> Result<i64, sqlx::Error> {
pub async fn create_replicator(
pool: &PgPool,
tenant_id: i64,
image_id: i64,
) -> Result<i64, sqlx::Error> {
let mut txn = pool.begin().await?;
let res = create_replicator_txn(&mut txn, tenant_id).await;
let res = create_replicator_txn(&mut txn, tenant_id, image_id).await;
txn.commit().await?;
res
}

pub async fn create_replicator_txn(
txn: &mut Transaction<'_, Postgres>,
tenant_id: i64,
image_id: i64,
) -> Result<i64, sqlx::Error> {
let record = sqlx::query!(
r#"
insert into replicators (tenant_id, status)
values ($1, $2::replicator_status)
insert into replicators (tenant_id, image_id, status)
values ($1, $2, $3::replicator_status)
returning id
"#,
tenant_id,
image_id,
ReplicatorStatus::Stopped as ReplicatorStatus
)
.fetch_one(&mut **txn)
Expand All @@ -48,7 +55,7 @@ pub async fn read_replicator_by_pipeline_id(
) -> Result<Option<Replicator>, sqlx::Error> {
let record = sqlx::query!(
r#"
select r.id, r.tenant_id, status as "status: ReplicatorStatus"
select r.id, r.tenant_id, r.image_id, r.status as "status: ReplicatorStatus"
from replicators r
join pipelines p on r.id = p.replicator_id
where r.tenant_id = $1 and p.tenant_id = $1 and p.id = $2
Expand All @@ -62,6 +69,7 @@ pub async fn read_replicator_by_pipeline_id(
Ok(record.map(|r| Replicator {
id: r.id,
tenant_id: r.tenant_id,
image_id: r.image_id,
status: r.status,
}))
}
34 changes: 27 additions & 7 deletions api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use thiserror::Error;
use crate::{
db::{
self,
images::Image,
pipelines::{Pipeline, PipelineConfig},
replicators::Replicator,
sinks::{sink_exists, Sink, SinkConfig},
Expand Down Expand Up @@ -41,6 +42,12 @@ enum PipelineError {
#[error("replicator with pipeline id {0} not found")]
ReplicatorNotFound(i64),

#[error("image with replicator id {0} not found")]
ImageNotFound(i64),

#[error("no default image found")]
NoDefaultImageFound,

#[error("tenant id missing in request")]
TenantIdMissing,

Expand All @@ -67,7 +74,9 @@ impl ResponseError for PipelineError {
match self {
PipelineError::DatabaseError(_)
| PipelineError::InvalidConfig(_)
| PipelineError::ReplicatorNotFound(_) => StatusCode::INTERNAL_SERVER_ERROR,
| PipelineError::ReplicatorNotFound(_)
| PipelineError::ImageNotFound(_)
| PipelineError::NoDefaultImageFound => StatusCode::INTERNAL_SERVER_ERROR,
PipelineError::PipelineNotFound(_) => StatusCode::NOT_FOUND,
PipelineError::TenantIdMissing
| PipelineError::TenantIdIllFormed
Expand Down Expand Up @@ -145,11 +154,16 @@ pub async fn create_pipeline(
return Err(PipelineError::SinkNotFound(pipeline.sink_id));
}

let image = db::images::read_default_image(&pool)
.await?
.ok_or(PipelineError::NoDefaultImageFound)?;

let id = db::pipelines::create_pipeline(
&pool,
tenant_id,
pipeline.source_id,
pipeline.sink_id,
image.id,
pipeline.publication_name,
&config,
)
Expand Down Expand Up @@ -272,13 +286,15 @@ pub async fn start_pipeline(
let tenant_id = extract_tenant_id(&req)?;
let pipeline_id = pipeline_id.into_inner();

let (pipeline, replicator, source, sink) = read_data(&pool, tenant_id, pipeline_id).await?;
let (pipeline, replicator, image, source, sink) =
read_data(&pool, tenant_id, pipeline_id).await?;

let (secrets, config) = create_configs(source.config, sink.config, pipeline)?;

enqueue_create_or_update_secrets_task(&pool, tenant_id, replicator.id, secrets).await?;
enqueue_create_or_update_config_task(&pool, tenant_id, replicator.id, config).await?;
enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id).await?;
enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id, image.image_name)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -307,13 +323,16 @@ async fn read_data(
pool: &PgPool,
tenant_id: i64,
pipeline_id: i64,
) -> Result<(Pipeline, Replicator, Source, Sink), PipelineError> {
) -> Result<(Pipeline, Replicator, Image, Source, Sink), PipelineError> {
let pipeline = db::pipelines::read_pipeline(pool, tenant_id, pipeline_id)
.await?
.ok_or(PipelineError::PipelineNotFound(pipeline_id))?;
let replicator = db::replicators::read_replicator_by_pipeline_id(pool, tenant_id, pipeline_id)
.await?
.ok_or(PipelineError::ReplicatorNotFound(pipeline_id))?;
let image = db::images::read_image_by_replicator_id(pool, replicator.id)
.await?
.ok_or(PipelineError::ImageNotFound(replicator.id))?;
let source_id = pipeline.source_id;
let source = db::sources::read_source(pool, tenant_id, source_id)
.await?
Expand All @@ -322,7 +341,8 @@ async fn read_data(
let sink = db::sinks::read_sink(pool, tenant_id, sink_id)
.await?
.ok_or(PipelineError::SinkNotFound(sink_id))?;
Ok((pipeline, replicator, source, sink))

Ok((pipeline, replicator, image, source, sink))
}

fn create_configs(
Expand Down Expand Up @@ -422,12 +442,12 @@ async fn enqueue_create_or_update_replicator_task(
pool: &PgPool,
tenant_id: i64,
replicator_id: i64,
replicator_image: String,
) -> Result<(), PipelineError> {
let req = Request::CreateOrUpdateReplicator {
tenant_id,
replicator_id,
replicator_image: "ramsup/replicator:main.96457e346eed76c0dce648dbfd5a6f645220ea9a"
.to_string(), //TODO: remove hardcode image
replicator_image,
};

let task_data = serde_json::to_value(req)?;
Expand Down

0 comments on commit c4a09ba

Please sign in to comment.