Skip to content

Commit

Permalink
add create, read and delete api for publications
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 30, 2024
1 parent 6ce8704 commit cab9f38
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 69 deletions.
144 changes: 135 additions & 9 deletions api/src/db/publications.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::borrow::Cow;
use std::{borrow::Cow, collections::HashMap};

use sqlx::{postgres::PgConnectOptions, Connection, Executor, PgConnection};
use serde::Serialize;
use sqlx::{postgres::PgConnectOptions, Connection, Executor, PgConnection, Row};
use tracing::error;

use super::tables::Table;

Expand All @@ -23,25 +25,54 @@ fn quote_identifier_alloc(identifier: &str) -> String {
quoted_identifier
}

pub async fn create_publication_on_source(
publication_name: &str,
tables: &[Table],
pub fn quote_literal(literal: &str) -> String {
let mut quoted_literal = String::with_capacity(literal.len() + 2);

if literal.find('\\').is_some() {
quoted_literal.push('E');
}

quoted_literal.push('\'');

for char in literal.chars() {
if char == '\'' {
quoted_literal.push('\'');
} else if char == '\\' {
quoted_literal.push('\\');
}

quoted_literal.push(char);
}

quoted_literal.push('\'');

quoted_literal
}

#[derive(Serialize)]
pub struct Publication {
pub name: String,
pub tables: Vec<Table>,
}

pub async fn create_publication(
publication: &Publication,
options: &PgConnectOptions,
) -> Result<(), sqlx::Error> {
let mut query = String::new();
let quoted_publication_name = quote_identifier(publication_name);
let quoted_publication_name = quote_identifier(&publication.name);
query.push_str("create publication ");
query.push_str(&quoted_publication_name);
query.push_str(" table only ");
query.push_str(" for table only ");

for (i, table) in tables.iter().enumerate() {
for (i, table) in publication.tables.iter().enumerate() {
let quoted_schema = quote_identifier(&table.schema);
let quoted_name = quote_identifier(&table.name);
query.push_str(&quoted_schema);
query.push('.');
query.push_str(&quoted_name);

if i < tables.len() - 1 {
if i < publication.tables.len() - 1 {
query.push(',')
}
}
Expand All @@ -51,3 +82,98 @@ pub async fn create_publication_on_source(

Ok(())
}

pub async fn drop_publication(
publication_name: &str,
options: &PgConnectOptions,
) -> Result<(), sqlx::Error> {
let mut query = String::new();
query.push_str("drop publication if exists ");
let quoted_publication_name = quote_identifier(publication_name);
query.push_str(&quoted_publication_name);

let mut connection = PgConnection::connect_with(options).await?;
connection.execute(query.as_str()).await?;

Ok(())
}

pub async fn read_publication(
publication_name: &str,
options: &PgConnectOptions,
) -> Result<Option<Publication>, sqlx::Error> {
let mut query = String::new();
query.push_str(
r#"
select p.pubname, pt.schemaname, pt.tablename from pg_publication p
join pg_publication_tables pt on p.pubname = pt.pubname
where
p.puballtables = false
and p.pubinsert = true
and p.pubupdate = true
and p.pubdelete = true
and p.pubtruncate = true
and p.pubname =
"#,
);

let quoted_publication_name = quote_literal(publication_name);
query.push_str(&quoted_publication_name);

error!("QUERY: {query}");

let mut connection = PgConnection::connect_with(options).await?;

let mut tables = vec![];
let mut name: Option<String> = None;

for row in connection.fetch_all(query.as_str()).await? {
let pub_name: String = row.get("pubname");
if let Some(ref name) = name {
assert_eq!(name.as_str(), pub_name);
} else {
name = Some(pub_name);
}
let schema = row.get("schemaname");
let name = row.get("tablename");
tables.push(Table { schema, name });
}

let publication = name.map(|name| Publication { name, tables });

Ok(publication)
}

pub async fn read_all_publications(
options: &PgConnectOptions,
) -> Result<Vec<Publication>, sqlx::Error> {
let query = r#"
select p.pubname, pt.schemaname, pt.tablename from pg_publication p
join pg_publication_tables pt on p.pubname = pt.pubname
where
p.puballtables = false
and p.pubinsert = true
and p.pubupdate = true
and p.pubdelete = true
and p.pubtruncate = true;
"#;

let mut connection = PgConnection::connect_with(options).await?;

let mut pub_name_to_tables: HashMap<String, Vec<Table>> = HashMap::new();

for row in connection.fetch_all(query).await? {
let pub_name: String = row.get("pubname");
let schema = row.get("schemaname");
let name = row.get("tablename");
let tables = pub_name_to_tables.entry(pub_name).or_default();
tables.push(Table { schema, name });
}

let publications = pub_name_to_tables
.into_iter()
.map(|(name, tables)| Publication { name, tables })
.collect();

Ok(publications)
}
4 changes: 2 additions & 2 deletions api/src/db/tables.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use serde::Serialize;
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgConnectOptions, Connection, Executor, PgConnection, Row};

#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
pub struct Table {
pub schema: String,
pub name: String,
Expand Down
205 changes: 205 additions & 0 deletions api/src/routes/sources/publications.rs
Original file line number Diff line number Diff line change
@@ -1 +1,206 @@
use actix_web::{
delete, get,
http::{header::ContentType, StatusCode},
post,
web::{Data, Json, Path},
HttpRequest, HttpResponse, Responder, ResponseError,
};
use serde::Deserialize;
use sqlx::PgPool;
use thiserror::Error;

use crate::{
db::{self, publications::Publication, sources::SourceConfig, tables::Table},
routes::ErrorMessage,
};

#[derive(Debug, Error)]
enum PublicationError {
#[error("database error: {0}")]
DatabaseError(#[from] sqlx::Error),

#[error("source with id {0} not found")]
SourceNotFound(i64),

#[error("publication with name {0} not found")]
PublicationNotFound(String),

#[error("tenant id missing in request")]
TenantIdMissing,

#[error("tenant id ill formed in request")]
TenantIdIllFormed,

#[error("invalid source config")]
InvalidConfig(#[from] serde_json::Error),
}

impl PublicationError {
fn to_message(&self) -> String {
match self {
// Do not expose internal database details in error messages
PublicationError::DatabaseError(_) => "internal server error".to_string(),
// Every other message is ok, as they do not divulge sensitive information
e => e.to_string(),
}
}
}

impl ResponseError for PublicationError {
fn status_code(&self) -> StatusCode {
match self {
PublicationError::DatabaseError(_) | PublicationError::InvalidConfig(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
PublicationError::SourceNotFound(_) | PublicationError::PublicationNotFound(_) => {
StatusCode::NOT_FOUND
}
PublicationError::TenantIdMissing | PublicationError::TenantIdIllFormed => {
StatusCode::BAD_REQUEST
}
}
}

fn error_response(&self) -> HttpResponse {
let error_message = ErrorMessage {
error: self.to_message(),
};
let body =
serde_json::to_string(&error_message).expect("failed to serialize error message");
HttpResponse::build(self.status_code())
.insert_header(ContentType::json())
.body(body)
}
}

// TODO: read tenant_id from a jwt
fn extract_tenant_id(req: &HttpRequest) -> Result<i64, PublicationError> {
let headers = req.headers();
let tenant_id = headers
.get("tenant_id")
.ok_or(PublicationError::TenantIdMissing)?;
let tenant_id = tenant_id
.to_str()
.map_err(|_| PublicationError::TenantIdIllFormed)?;
let tenant_id: i64 = tenant_id
.parse()
.map_err(|_| PublicationError::TenantIdIllFormed)?;
Ok(tenant_id)
}

#[derive(Deserialize)]
struct CreatePublicationRequest {
name: String,
tables: Vec<Table>,
}

#[derive(Deserialize)]
struct DeletePublicationRequest {
name: String,
}

#[post("/sources/{source_id}/publications")]
pub async fn create_publication(
req: HttpRequest,
pool: Data<PgPool>,
source_id: Path<i64>,
publication: Json<CreatePublicationRequest>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let source_id = source_id.into_inner();

let config = db::sources::read_source(&pool, tenant_id, source_id)
.await?
.map(|s| {
let config: SourceConfig = serde_json::from_value(s.config)?;
Ok::<SourceConfig, serde_json::Error>(config)
})
.transpose()?
.ok_or(PublicationError::SourceNotFound(source_id))?;

let options = config.connect_options();
let publication = publication.0;
let publication = Publication {
name: publication.name,
tables: publication.tables,
};
db::publications::create_publication(&publication, &options).await?;

Ok(HttpResponse::Ok().finish())
}

#[get("/sources/{source_id}/publications/{publication_name}")]
pub async fn read_publication(
req: HttpRequest,
pool: Data<PgPool>,
source_id_and_pub_name: Path<(i64, String)>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let (source_id, publication_name) = source_id_and_pub_name.into_inner();

let config = db::sources::read_source(&pool, tenant_id, source_id)
.await?
.map(|s| {
let config: SourceConfig = serde_json::from_value(s.config)?;
Ok::<SourceConfig, serde_json::Error>(config)
})
.transpose()?
.ok_or(PublicationError::SourceNotFound(source_id))?;

let options = config.connect_options();
let publications = db::publications::read_publication(&publication_name, &options)
.await?
.ok_or(PublicationError::PublicationNotFound(publication_name))?;

Ok(Json(publications))
}

#[get("/sources/{source_id}/publications")]
pub async fn read_all_publications(
req: HttpRequest,
pool: Data<PgPool>,
source_id: Path<i64>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let source_id = source_id.into_inner();

let config = db::sources::read_source(&pool, tenant_id, source_id)
.await?
.map(|s| {
let config: SourceConfig = serde_json::from_value(s.config)?;
Ok::<SourceConfig, serde_json::Error>(config)
})
.transpose()?
.ok_or(PublicationError::SourceNotFound(source_id))?;

let options = config.connect_options();
let publications = db::publications::read_all_publications(&options).await?;

Ok(Json(publications))
}

#[delete("/sources/{source_id}/publications")]
pub async fn delete_publication(
req: HttpRequest,
pool: Data<PgPool>,
source_id: Path<i64>,
publication: Json<DeletePublicationRequest>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let source_id = source_id.into_inner();

let config = db::sources::read_source(&pool, tenant_id, source_id)
.await?
.map(|s| {
let config: SourceConfig = serde_json::from_value(s.config)?;
Ok::<SourceConfig, serde_json::Error>(config)
})
.transpose()?
.ok_or(PublicationError::SourceNotFound(source_id))?;

let options = config.connect_options();
let publication = publication.0;
db::publications::drop_publication(&publication.name, &options).await?;

Ok(HttpResponse::Ok().finish())
}
2 changes: 1 addition & 1 deletion api/src/routes/sources/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn extract_tenant_id(req: &HttpRequest) -> Result<i64, TableError> {
Ok(tenant_id)
}

#[get("/sources/{source_id}/table_names")]
#[get("/sources/{source_id}/tables")]
pub async fn read_table_names(
req: HttpRequest,
pool: Data<PgPool>,
Expand Down
Loading

0 comments on commit cab9f38

Please sign in to comment.