Skip to content

Commit

Permalink
add api to stop a pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 3, 2024
1 parent 18440fd commit 6063fe0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
67 changes: 65 additions & 2 deletions api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,32 @@ pub async fn start_pipeline(
let (secrets, config) = create_configs(source.config, sink.config, pipeline)?;

enqueue_create_or_update_secrets_task(&pool, tenant_id, replicator.id, secrets).await?;

enqueue_create_or_update_config_task(&pool, tenant_id, replicator.id, config).await?;

enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id).await?;

Ok(HttpResponse::Ok().finish())
}

#[post("/pipelines/{pipeline_id}/stop")]
pub async fn stop_pipeline(
req: HttpRequest,
pool: Data<PgPool>,
pipeline_id: Path<i64>,
) -> Result<impl Responder, PipelineError> {
let tenant_id = extract_tenant_id(&req)?;
let pipeline_id = pipeline_id.into_inner();

let replicator = db::replicators::read_replicator_by_pipeline_id(&pool, tenant_id, pipeline_id)
.await?
.ok_or(PipelineError::ReplicatorNotFound(pipeline_id))?;

enqueue_delete_secrets_task(&pool, tenant_id, replicator.id).await?;
enqueue_delete_config_task(&pool, tenant_id, replicator.id).await?;
enqueue_delete_replicator_task(&pool, tenant_id, replicator.id).await?;

Ok(HttpResponse::Ok().finish())
}

async fn read_data(
pool: &PgPool,
tenant_id: i64,
Expand Down Expand Up @@ -417,3 +435,48 @@ async fn enqueue_create_or_update_replicator_task(

Ok(())
}

async fn enqueue_delete_secrets_task(
pool: &PgPool,
tenant_id: i64,
replicator_id: i64,
) -> Result<(), PipelineError> {
let req = Request::DeleteSecrets {
tenant_id,
replicator_id,
};

let task_data = serde_json::to_value(req)?;
enqueue_task(pool, "delete_secrets", task_data).await?;
Ok(())
}

async fn enqueue_delete_config_task(
pool: &PgPool,
tenant_id: i64,
replicator_id: i64,
) -> Result<(), PipelineError> {
let req = Request::DeleteConfig {
tenant_id,
replicator_id,
};

let task_data = serde_json::to_value(req)?;
enqueue_task(pool, "delete_config", task_data).await?;
Ok(())
}

async fn enqueue_delete_replicator_task(
pool: &PgPool,
tenant_id: i64,
replicator_id: i64,
) -> Result<(), PipelineError> {
let req = Request::DeleteReplicator {
tenant_id,
replicator_id,
};

let task_data = serde_json::to_value(req)?;
enqueue_task(pool, "delete_replicator", task_data).await?;
Ok(())
}
3 changes: 2 additions & 1 deletion api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
health_check::health_check,
pipelines::{
create_pipeline, delete_pipeline, read_all_pipelines, read_pipeline, start_pipeline,
update_pipeline,
stop_pipeline, update_pipeline,
},
sinks::{create_sink, delete_sink, read_all_sinks, read_sink, update_sink},
sources::{
Expand Down Expand Up @@ -93,6 +93,7 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result<Serve
.service(delete_pipeline)
.service(read_all_pipelines)
.service(start_pipeline)
.service(stop_pipeline)
//tables
.service(read_table_names)
//publications
Expand Down

0 comments on commit 6063fe0

Please sign in to comment.