Skip to content

Commit

Permalink
add update api for publications
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Sep 2, 2024
1 parent cab9f38 commit 96457e3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 8 deletions.
28 changes: 28 additions & 0 deletions api/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,34 @@ pub async fn create_publication(
Ok(())
}

pub async fn update_publication(
publication: &Publication,
options: &PgConnectOptions,
) -> Result<(), sqlx::Error> {
let mut query = String::new();
let quoted_publication_name = quote_identifier(&publication.name);
query.push_str("alter publication ");
query.push_str(&quoted_publication_name);
query.push_str(" set table only ");

for (i, table) in publication.tables.iter().enumerate() {
let quoted_schema = quote_identifier(&table.schema);
let quoted_name = quote_identifier(&table.name);
query.push_str(&quoted_schema);
query.push('.');
query.push_str(&quoted_name);

if i < publication.tables.len() - 1 {
query.push(',')
}
}

let mut connection = PgConnection::connect_with(options).await?;
connection.execute(query.as_str()).await?;

Ok(())
}

pub async fn drop_publication(
publication_name: &str,
options: &PgConnectOptions,
Expand Down
44 changes: 36 additions & 8 deletions api/src/routes/sources/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ struct CreatePublicationRequest {
}

#[derive(Deserialize)]
struct DeletePublicationRequest {
name: String,
struct UpdatePublicationRequest {
tables: Vec<Table>,
}

#[post("/sources/{source_id}/publications")]
Expand Down Expand Up @@ -129,6 +129,36 @@ pub async fn create_publication(
Ok(HttpResponse::Ok().finish())
}

#[post("/sources/{source_id}/publications/{publication_name}")]
pub async fn update_publication(
req: HttpRequest,
pool: Data<PgPool>,
source_id_and_pub_name: Path<(i64, String)>,
publication: Json<UpdatePublicationRequest>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let (source_id, publication_name) = source_id_and_pub_name.into_inner();

let config = db::sources::read_source(&pool, tenant_id, source_id)
.await?
.map(|s| {
let config: SourceConfig = serde_json::from_value(s.config)?;
Ok::<SourceConfig, serde_json::Error>(config)
})
.transpose()?
.ok_or(PublicationError::SourceNotFound(source_id))?;

let options = config.connect_options();
let publication = publication.0;
let publication = Publication {
name: publication_name,
tables: publication.tables,
};
db::publications::update_publication(&publication, &options).await?;

Ok(HttpResponse::Ok().finish())
}

#[get("/sources/{source_id}/publications/{publication_name}")]
pub async fn read_publication(
req: HttpRequest,
Expand Down Expand Up @@ -179,15 +209,14 @@ pub async fn read_all_publications(
Ok(Json(publications))
}

#[delete("/sources/{source_id}/publications")]
#[delete("/sources/{source_id}/publications/{publication_name}")]
pub async fn delete_publication(
req: HttpRequest,
pool: Data<PgPool>,
source_id: Path<i64>,
publication: Json<DeletePublicationRequest>,
source_id_and_pub_name: Path<(i64, String)>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let source_id = source_id.into_inner();
let (source_id, publication_name) = source_id_and_pub_name.into_inner();

let config = db::sources::read_source(&pool, tenant_id, source_id)
.await?
Expand All @@ -199,8 +228,7 @@ pub async fn delete_publication(
.ok_or(PublicationError::SourceNotFound(source_id))?;

let options = config.connect_options();
let publication = publication.0;
db::publications::drop_publication(&publication.name, &options).await?;
db::publications::drop_publication(&publication_name, &options).await?;

Ok(HttpResponse::Ok().finish())
}
2 changes: 2 additions & 0 deletions api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
create_source, delete_source,
publications::{
create_publication, delete_publication, read_all_publications, read_publication,
update_publication,
},
read_all_sources, read_source,
tables::read_table_names,
Expand Down Expand Up @@ -95,6 +96,7 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result<Serve
//publications
.service(create_publication)
.service(read_publication)
.service(update_publication)
.service(delete_publication)
.service(read_all_publications),
)
Expand Down

0 comments on commit 96457e3

Please sign in to comment.