Skip to content

Commit

Permalink
add images api
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 3, 2024
1 parent c4a09ba commit 89e9673
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 18 deletions.
2 changes: 1 addition & 1 deletion api/migrations/20240828090309_create_images.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
create table
public.images (
id bigint generated always as identity primary key,
image_name text not null,
name text not null,
is_default boolean not null
);
101 changes: 87 additions & 14 deletions api/src/db/images.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,29 @@ use sqlx::{PgPool, Postgres, Transaction};

pub struct Image {
pub id: i64,
pub image_name: String,
pub name: String,
pub is_default: bool,
}

pub async fn create_image(
pool: &PgPool,
image_name: &str,
is_default: bool,
) -> Result<i64, sqlx::Error> {
pub async fn create_image(pool: &PgPool, name: &str, is_default: bool) -> Result<i64, sqlx::Error> {
let mut txn = pool.begin().await?;
let res = create_image_txn(&mut txn, image_name, is_default).await;
let res = create_image_txn(&mut txn, name, is_default).await;
txn.commit().await?;
res
}

pub async fn create_image_txn(
txn: &mut Transaction<'_, Postgres>,
image_name: &str,
name: &str,
is_default: bool,
) -> Result<i64, sqlx::Error> {
let record = sqlx::query!(
r#"
insert into images (image_name, is_default)
insert into images (name, is_default)
values ($1, $2)
returning id
"#,
image_name,
name,
is_default
)
.fetch_one(&mut **txn)
Expand All @@ -40,7 +36,7 @@ pub async fn create_image_txn(
pub async fn read_default_image(pool: &PgPool) -> Result<Option<Image>, sqlx::Error> {
let record = sqlx::query!(
r#"
select id, image_name, is_default
select id, name, is_default
from images
where is_default = true
"#,
Expand All @@ -50,18 +46,95 @@ pub async fn read_default_image(pool: &PgPool) -> Result<Option<Image>, sqlx::Er

Ok(record.map(|r| Image {
id: r.id,
image_name: r.image_name,
name: r.name,
is_default: r.is_default,
}))
}

pub async fn read_image(pool: &PgPool, image_id: i64) -> Result<Option<Image>, sqlx::Error> {
let record = sqlx::query!(
r#"
select id, name, is_default
from images
where id = $1
"#,
image_id,
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| Image {
id: r.id,
name: r.name,
is_default: r.is_default,
}))
}

pub async fn update_image(
pool: &PgPool,
image_id: i64,
name: &str,
is_default: bool,
) -> Result<Option<i64>, sqlx::Error> {
let record = sqlx::query!(
r#"
update images
set name = $1, is_default = $2
where id = $3
returning id
"#,
name,
is_default,
image_id
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| r.id))
}

pub async fn delete_image(pool: &PgPool, image_id: i64) -> Result<Option<i64>, sqlx::Error> {
let record = sqlx::query!(
r#"
delete from images
where id = $1
returning id
"#,
image_id
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| r.id))
}

pub async fn read_all_images(pool: &PgPool) -> Result<Vec<Image>, sqlx::Error> {
let mut record = sqlx::query!(
r#"
select id, name, is_default
from images
"#,
)
.fetch_all(pool)
.await?;

Ok(record
.drain(..)
.map(|r| Image {
id: r.id,
name: r.name,
is_default: r.is_default,
})
.collect())
}

pub async fn read_image_by_replicator_id(
pool: &PgPool,
replicator_id: i64,
) -> Result<Option<Image>, sqlx::Error> {
let record = sqlx::query!(
r#"
select i.id, i.image_name, i.is_default
select i.id, i.name, i.is_default
from images i
join replicators r on i.id = r.image_id
where r.id = $1
Expand All @@ -73,7 +146,7 @@ pub async fn read_image_by_replicator_id(

Ok(record.map(|r| Image {
id: r.id,
image_name: r.image_name,
name: r.name,
is_default: r.is_default,
}))
}
139 changes: 139 additions & 0 deletions api/src/routes/images.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use actix_web::{
delete, get,
http::{header::ContentType, StatusCode},
post,
web::{Data, Json, Path},
HttpResponse, Responder, ResponseError,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use thiserror::Error;

use crate::db;

use super::ErrorMessage;

#[derive(Debug, Error)]
enum ImageError {
#[error("database error: {0}")]
DatabaseError(#[from] sqlx::Error),

#[error("source with id {0} not found")]
ImageNotFound(i64),
}

impl ImageError {
fn to_message(&self) -> String {
match self {
// Do not expose internal database details in error messages
ImageError::DatabaseError(_) => "internal server error".to_string(),
// Every other message is ok, as they do not divulge sensitive information
e => e.to_string(),
}
}
}

impl ResponseError for ImageError {
fn status_code(&self) -> StatusCode {
match self {
ImageError::DatabaseError(_) => StatusCode::INTERNAL_SERVER_ERROR,
ImageError::ImageNotFound(_) => StatusCode::NOT_FOUND,
}
}

fn error_response(&self) -> HttpResponse {
let error_message = ErrorMessage {
error: self.to_message(),
};
let body =
serde_json::to_string(&error_message).expect("failed to serialize error message");
HttpResponse::build(self.status_code())
.insert_header(ContentType::json())
.body(body)
}
}

#[derive(Deserialize)]
struct PostImageRequest {
pub name: String,
pub is_default: bool,
}

#[derive(Serialize)]
struct PostImageResponse {
id: i64,
}

#[derive(Serialize)]
struct GetImageResponse {
id: i64,
name: String,
is_default: bool,
}

#[post("/images")]
pub async fn create_image(
pool: Data<PgPool>,
image: Json<PostImageRequest>,
) -> Result<impl Responder, ImageError> {
let image = image.0;
let id = db::images::create_image(&pool, &image.name, image.is_default).await?;
let response = PostImageResponse { id };
Ok(Json(response))
}

#[get("/images/{image_id}")]
pub async fn read_image(
pool: Data<PgPool>,
image_id: Path<i64>,
) -> Result<impl Responder, ImageError> {
let image_id = image_id.into_inner();
let response = db::images::read_image(&pool, image_id)
.await?
.map(|s| GetImageResponse {
id: s.id,
name: s.name,
is_default: s.is_default,
})
.ok_or(ImageError::ImageNotFound(image_id))?;
Ok(Json(response))
}

#[post("/images/{image_id}")]
pub async fn update_image(
pool: Data<PgPool>,
image_id: Path<i64>,
image: Json<PostImageRequest>,
) -> Result<impl Responder, ImageError> {
let image_id = image_id.into_inner();
db::images::update_image(&pool, image_id, &image.name, image.is_default)
.await?
.ok_or(ImageError::ImageNotFound(image_id))?;
Ok(HttpResponse::Ok().finish())
}

#[delete("/images/{image_id}")]
pub async fn delete_image(
pool: Data<PgPool>,
image_id: Path<i64>,
) -> Result<impl Responder, ImageError> {
let image_id = image_id.into_inner();
db::images::delete_image(&pool, image_id)
.await?
.ok_or(ImageError::ImageNotFound(image_id))?;
Ok(HttpResponse::Ok().finish())
}

#[get("/images")]
pub async fn read_all_images(pool: Data<PgPool>) -> Result<impl Responder, ImageError> {
let mut sources = vec![];
for image in db::images::read_all_images(&pool).await? {
let image = GetImageResponse {
id: image.id,
name: image.name,
is_default: image.is_default,
};
sources.push(image);
}
Ok(Json(sources))
}
1 change: 1 addition & 0 deletions api/src/routes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::Serialize;

pub mod health_check;
pub mod images;
pub mod pipelines;
pub mod sinks;
pub mod sources;
Expand Down
3 changes: 1 addition & 2 deletions api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ pub async fn start_pipeline(

enqueue_create_or_update_secrets_task(&pool, tenant_id, replicator.id, secrets).await?;
enqueue_create_or_update_config_task(&pool, tenant_id, replicator.id, config).await?;
enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id, image.image_name)
.await?;
enqueue_create_or_update_replicator_task(&pool, tenant_id, replicator.id, image.name).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down
9 changes: 8 additions & 1 deletion api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
configuration::{DatabaseSettings, Settings},
routes::{
health_check::health_check,
images::{create_image, delete_image, read_all_images, read_image, update_image},
pipelines::{
create_pipeline, delete_pipeline, read_all_pipelines, read_pipeline, start_pipeline,
stop_pipeline, update_pipeline,
Expand Down Expand Up @@ -101,7 +102,13 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result<Serve
.service(read_publication)
.service(update_publication)
.service(delete_publication)
.service(read_all_publications),
.service(read_all_publications)
//images
.service(create_image)
.service(read_image)
.service(update_image)
.service(delete_image)
.service(read_all_images),
)
.app_data(connection_pool.clone())
})
Expand Down
15 changes: 15 additions & 0 deletions api/tests/api/images.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::test_app::{CreateImageRequest, CreateImageResponse, TestApp};

pub async fn create_default_image(app: &TestApp) -> i64 {
create_image_with_name(app, "some/image".to_string(), true).await
}

pub async fn create_image_with_name(app: &TestApp, name: String, is_default: bool) -> i64 {
let image = CreateImageRequest { name, is_default };
let response = app.create_image(&image).await;
let response: CreateImageResponse = response
.json()
.await
.expect("failed to deserialize response");
response.id
}
1 change: 1 addition & 0 deletions api/tests/api/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod database;
mod health_check;
mod images;
mod pipelines;
mod sinks;
mod sources;
Expand Down
Loading

0 comments on commit 89e9673

Please sign in to comment.