From 89e9673450a8e60cb0b4723eacd6c49266f032a9 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Tue, 3 Sep 2024 17:39:49 +0530 Subject: [PATCH] add images api --- .../20240828090309_create_images.sql | 2 +- api/src/db/images.rs | 101 +++++++++++-- api/src/routes/images.rs | 139 ++++++++++++++++++ api/src/routes/mod.rs | 1 + api/src/routes/pipelines.rs | 3 +- api/src/startup.rs | 9 +- api/tests/api/images.rs | 15 ++ api/tests/api/main.rs | 1 + api/tests/api/pipelines.rs | 11 ++ api/tests/api/test_app.rs | 70 +++++++++ 10 files changed, 334 insertions(+), 18 deletions(-) create mode 100644 api/src/routes/images.rs create mode 100644 api/tests/api/images.rs diff --git a/api/migrations/20240828090309_create_images.sql b/api/migrations/20240828090309_create_images.sql index f8135d7..a66b955 100644 --- a/api/migrations/20240828090309_create_images.sql +++ b/api/migrations/20240828090309_create_images.sql @@ -1,6 +1,6 @@ create table public.images ( id bigint generated always as identity primary key, - image_name text not null, + name text not null, is_default boolean not null ); diff --git a/api/src/db/images.rs b/api/src/db/images.rs index abc70da..5bdcb83 100644 --- a/api/src/db/images.rs +++ b/api/src/db/images.rs @@ -2,33 +2,29 @@ use sqlx::{PgPool, Postgres, Transaction}; pub struct Image { pub id: i64, - pub image_name: String, + pub name: String, pub is_default: bool, } -pub async fn create_image( - pool: &PgPool, - image_name: &str, - is_default: bool, -) -> Result { +pub async fn create_image(pool: &PgPool, name: &str, is_default: bool) -> Result { let mut txn = pool.begin().await?; - let res = create_image_txn(&mut txn, image_name, is_default).await; + let res = create_image_txn(&mut txn, name, is_default).await; txn.commit().await?; res } pub async fn create_image_txn( txn: &mut Transaction<'_, Postgres>, - image_name: &str, + name: &str, is_default: bool, ) -> Result { let record = sqlx::query!( r#" - insert into images (image_name, is_default) + insert into images (name, is_default) values ($1, $2) returning id "#, - image_name, + name, is_default ) .fetch_one(&mut **txn) @@ -40,7 +36,7 @@ pub async fn create_image_txn( pub async fn read_default_image(pool: &PgPool) -> Result, sqlx::Error> { let record = sqlx::query!( r#" - select id, image_name, is_default + select id, name, is_default from images where is_default = true "#, @@ -50,18 +46,95 @@ pub async fn read_default_image(pool: &PgPool) -> Result, sqlx::Er Ok(record.map(|r| Image { id: r.id, - image_name: r.image_name, + name: r.name, + is_default: r.is_default, + })) +} + +pub async fn read_image(pool: &PgPool, image_id: i64) -> Result, sqlx::Error> { + let record = sqlx::query!( + r#" + select id, name, is_default + from images + where id = $1 + "#, + image_id, + ) + .fetch_optional(pool) + .await?; + + Ok(record.map(|r| Image { + id: r.id, + name: r.name, is_default: r.is_default, })) } +pub async fn update_image( + pool: &PgPool, + image_id: i64, + name: &str, + is_default: bool, +) -> Result, sqlx::Error> { + let record = sqlx::query!( + r#" + update images + set name = $1, is_default = $2 + where id = $3 + returning id + "#, + name, + is_default, + image_id + ) + .fetch_optional(pool) + .await?; + + Ok(record.map(|r| r.id)) +} + +pub async fn delete_image(pool: &PgPool, image_id: i64) -> Result, sqlx::Error> { + let record = sqlx::query!( + r#" + delete from images + where id = $1 + returning id + "#, + image_id + ) + .fetch_optional(pool) + .await?; + + Ok(record.map(|r| r.id)) +} + +pub async fn read_all_images(pool: &PgPool) -> Result, sqlx::Error> { + let mut record = sqlx::query!( + r#" + select id, name, is_default + from images + "#, + ) + .fetch_all(pool) + .await?; + + Ok(record + .drain(..) + .map(|r| Image { + id: r.id, + name: r.name, + is_default: r.is_default, + }) + .collect()) +} + pub async fn read_image_by_replicator_id( pool: &PgPool, replicator_id: i64, ) -> Result, sqlx::Error> { let record = sqlx::query!( r#" - select i.id, i.image_name, i.is_default + select i.id, i.name, i.is_default from images i join replicators r on i.id = r.image_id where r.id = $1 @@ -73,7 +146,7 @@ pub async fn read_image_by_replicator_id( Ok(record.map(|r| Image { id: r.id, - image_name: r.image_name, + name: r.name, is_default: r.is_default, })) } diff --git a/api/src/routes/images.rs b/api/src/routes/images.rs new file mode 100644 index 0000000..903716d --- /dev/null +++ b/api/src/routes/images.rs @@ -0,0 +1,139 @@ +use actix_web::{ + delete, get, + http::{header::ContentType, StatusCode}, + post, + web::{Data, Json, Path}, + HttpResponse, Responder, ResponseError, +}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use thiserror::Error; + +use crate::db; + +use super::ErrorMessage; + +#[derive(Debug, Error)] +enum ImageError { + #[error("database error: {0}")] + DatabaseError(#[from] sqlx::Error), + + #[error("source with id {0} not found")] + ImageNotFound(i64), +} + +impl ImageError { + fn to_message(&self) -> String { + match self { + // Do not expose internal database details in error messages + ImageError::DatabaseError(_) => "internal server error".to_string(), + // Every other message is ok, as they do not divulge sensitive information + e => e.to_string(), + } + } +} + +impl ResponseError for ImageError { + fn status_code(&self) -> StatusCode { + match self { + ImageError::DatabaseError(_) => StatusCode::INTERNAL_SERVER_ERROR, + ImageError::ImageNotFound(_) => StatusCode::NOT_FOUND, + } + } + + fn error_response(&self) -> HttpResponse { + let error_message = ErrorMessage { + error: self.to_message(), + }; + let body = + serde_json::to_string(&error_message).expect("failed to serialize error message"); + HttpResponse::build(self.status_code()) + .insert_header(ContentType::json()) + .body(body) + } +} + +#[derive(Deserialize)] +struct PostImageRequest { + pub name: String, + pub is_default: bool, +} + +#[derive(Serialize)] +struct PostImageResponse { + id: i64, +} + +#[derive(Serialize)] +struct GetImageResponse { + id: i64, + name: String, + is_default: bool, +} + +#[post("/images")] +pub async fn create_image( + pool: Data, + image: Json, +) -> Result { + let image = image.0; + let id = db::images::create_image(&pool, &image.name, image.is_default).await?; + let response = PostImageResponse { id }; + Ok(Json(response)) +} + +#[get("/images/{image_id}")] +pub async fn read_image( + pool: Data, + image_id: Path, +) -> Result { + let image_id = image_id.into_inner(); + let response = db::images::read_image(&pool, image_id) + .await? + .map(|s| GetImageResponse { + id: s.id, + name: s.name, + is_default: s.is_default, + }) + .ok_or(ImageError::ImageNotFound(image_id))?; + Ok(Json(response)) +} + +#[post("/images/{image_id}")] +pub async fn update_image( + pool: Data, + image_id: Path, + image: Json, +) -> Result { + let image_id = image_id.into_inner(); + db::images::update_image(&pool, image_id, &image.name, image.is_default) + .await? + .ok_or(ImageError::ImageNotFound(image_id))?; + Ok(HttpResponse::Ok().finish()) +} + +#[delete("/images/{image_id}")] +pub async fn delete_image( + pool: Data, + image_id: Path, +) -> Result { + let image_id = image_id.into_inner(); + db::images::delete_image(&pool, image_id) + .await? + .ok_or(ImageError::ImageNotFound(image_id))?; + Ok(HttpResponse::Ok().finish()) +} + +#[get("/images")] +pub async fn read_all_images(pool: Data) -> Result { + let mut sources = vec![]; + for image in db::images::read_all_images(&pool).await? { + let image = GetImageResponse { + id: image.id, + name: image.name, + is_default: image.is_default, + }; + sources.push(image); + } + Ok(Json(sources)) +} diff --git a/api/src/routes/mod.rs b/api/src/routes/mod.rs index 36246cd..922e766 100644 --- a/api/src/routes/mod.rs +++ b/api/src/routes/mod.rs @@ -1,6 +1,7 @@ use serde::Serialize; pub mod health_check; +pub mod images; pub mod pipelines; pub mod sinks; pub mod sources; diff --git a/api/src/routes/pipelines.rs b/api/src/routes/pipelines.rs index 727d8fe..f417ac7 100644 --- a/api/src/routes/pipelines.rs +++ b/api/src/routes/pipelines.rs @@ -293,8 +293,7 @@ pub async fn start_pipeline( enqueue_create_or_update_secrets_task(&pool, tenant_id, replicator.id, secrets).await?; enqueue_create_or_update_config_task(&pool, tenant_id, replicator.id, config).await?; - enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id, image.image_name) - .await?; + enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id, image.name).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/api/src/startup.rs b/api/src/startup.rs index 6359e02..9011485 100644 --- a/api/src/startup.rs +++ b/api/src/startup.rs @@ -8,6 +8,7 @@ use crate::{ configuration::{DatabaseSettings, Settings}, routes::{ health_check::health_check, + images::{create_image, delete_image, read_all_images, read_image, update_image}, pipelines::{ create_pipeline, delete_pipeline, read_all_pipelines, read_pipeline, start_pipeline, stop_pipeline, update_pipeline, @@ -101,7 +102,13 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result i64 { + create_image_with_name(app, "some/image".to_string(), true).await +} + +pub async fn create_image_with_name(app: &TestApp, name: String, is_default: bool) -> i64 { + let image = CreateImageRequest { name, is_default }; + let response = app.create_image(&image).await; + let response: CreateImageResponse = response + .json() + .await + .expect("failed to deserialize response"); + response.id +} diff --git a/api/tests/api/main.rs b/api/tests/api/main.rs index b6cdad5..5bcd01e 100644 --- a/api/tests/api/main.rs +++ b/api/tests/api/main.rs @@ -1,5 +1,6 @@ mod database; mod health_check; +mod images; mod pipelines; mod sinks; mod sources; diff --git a/api/tests/api/pipelines.rs b/api/tests/api/pipelines.rs index 1e8241d..1ff762f 100644 --- a/api/tests/api/pipelines.rs +++ b/api/tests/api/pipelines.rs @@ -2,6 +2,7 @@ use api::db::pipelines::{BatchConfig, PipelineConfig}; use reqwest::StatusCode; use crate::{ + images::create_default_image, sinks::create_sink, sources::create_source, tenants::create_tenant, @@ -36,6 +37,7 @@ pub async fn create_pipeline_with_config( sink_id: i64, config: PipelineConfig, ) -> i64 { + create_default_image(app).await; let pipeline = CreatePipelineRequest { source_id, sink_id, @@ -54,6 +56,7 @@ pub async fn create_pipeline_with_config( async fn pipeline_can_be_created() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; @@ -80,6 +83,7 @@ async fn pipeline_can_be_created() { async fn pipeline_with_another_tenants_source_cant_be_created() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant1_id = create_tenant(&app).await; let tenant2_id = create_tenant(&app).await; let source2_id = create_source(&app, tenant2_id).await; @@ -102,6 +106,7 @@ async fn pipeline_with_another_tenants_source_cant_be_created() { async fn pipeline_with_another_tenants_sink_cant_be_created() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant1_id = create_tenant(&app).await; let tenant2_id = create_tenant(&app).await; let source1_id = create_source(&app, tenant1_id).await; @@ -124,6 +129,7 @@ async fn pipeline_with_another_tenants_sink_cant_be_created() { async fn an_existing_pipeline_can_be_read() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; @@ -175,6 +181,7 @@ async fn an_non_existing_pipeline_cant_be_read() { async fn an_existing_pipeline_can_be_updated() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; @@ -224,6 +231,7 @@ async fn an_existing_pipeline_can_be_updated() { async fn pipeline_with_another_tenants_source_cant_be_updated() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant1_id = create_tenant(&app).await; let tenant2_id = create_tenant(&app).await; let source1_id = create_source(&app, tenant1_id).await; @@ -262,6 +270,7 @@ async fn pipeline_with_another_tenants_source_cant_be_updated() { async fn pipeline_with_another_tenants_sink_cant_be_updated() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant1_id = create_tenant(&app).await; let tenant2_id = create_tenant(&app).await; let source1_id = create_source(&app, tenant1_id).await; @@ -321,6 +330,7 @@ async fn an_non_existing_pipeline_cant_be_updated() { async fn an_existing_pipeline_can_be_deleted() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant_id = create_tenant(&app).await; let source_id = create_source(&app, tenant_id).await; let sink_id = create_sink(&app, tenant_id).await; @@ -364,6 +374,7 @@ async fn an_non_existing_pipeline_cant_be_deleted() { async fn all_pipelines_can_be_read() { // Arrange let app = spawn_app().await; + create_default_image(&app).await; let tenant_id = create_tenant(&app).await; let source1_id = create_source(&app, tenant_id).await; let source2_id = create_source(&app, tenant_id).await; diff --git a/api/tests/api/test_app.rs b/api/tests/api/test_app.rs index 363e179..13ef42d 100644 --- a/api/tests/api/test_app.rs +++ b/api/tests/api/test_app.rs @@ -112,6 +112,30 @@ pub struct UpdatePipelineRequest { pub config: PipelineConfig, } +#[derive(Serialize)] +pub struct CreateImageRequest { + pub name: String, + pub is_default: bool, +} + +#[derive(Deserialize)] +pub struct CreateImageResponse { + pub id: i64, +} + +#[derive(Deserialize)] +pub struct ImageResponse { + pub id: i64, + pub name: String, + pub is_default: bool, +} + +#[derive(Serialize)] +pub struct UpdateImageRequest { + pub name: String, + pub is_default: bool, +} + impl TestApp { pub async fn create_tenant(&self, tenant: &CreateTenantRequest) -> reqwest::Response { self.api_client @@ -322,6 +346,52 @@ impl TestApp { .await .expect("failed to execute request") } + + pub async fn create_image(&self, image: &CreateImageRequest) -> reqwest::Response { + self.api_client + .post(&format!("{}/v1/images", &self.address)) + .json(image) + .send() + .await + .expect("Failed to execute request.") + } + + pub async fn read_image(&self, image_id: i64) -> reqwest::Response { + self.api_client + .get(&format!("{}/v1/images/{image_id}", &self.address)) + .send() + .await + .expect("failed to execute request") + } + + pub async fn update_image( + &self, + image_id: i64, + image: &UpdateImageRequest, + ) -> reqwest::Response { + self.api_client + .post(&format!("{}/v1/images/{image_id}", &self.address)) + .json(image) + .send() + .await + .expect("failed to execute request") + } + + pub async fn delete_image(&self, image_id: i64) -> reqwest::Response { + self.api_client + .delete(&format!("{}/v1/images/{image_id}", &self.address)) + .send() + .await + .expect("Failed to execute request.") + } + + pub async fn read_all_images(&self) -> reqwest::Response { + self.api_client + .get(&format!("{}/v1/images", &self.address)) + .send() + .await + .expect("failed to execute request") + } } pub async fn spawn_app() -> TestApp {