From c85d148bc503bb4cb16140042714bb9e5124446c Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 29 Aug 2024 12:35:07 +0530 Subject: [PATCH] add publication support to cli --- cli/src/api_client.rs | 136 ++++++++++++++++++++++++++++++++++++++++ cli/src/commands.rs | 61 +++++++++++++++++- cli/src/main.rs | 1 + cli/src/publications.rs | 94 +++++++++++++++++++++++++++ 4 files changed, 290 insertions(+), 2 deletions(-) create mode 100644 cli/src/publications.rs diff --git a/cli/src/api_client.rs b/cli/src/api_client.rs index f0fad6a..e078638 100644 --- a/cli/src/api_client.rs +++ b/cli/src/api_client.rs @@ -107,6 +107,17 @@ impl Display for BatchConfig { } } +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct PublicationConfig { + pub table_names: Vec, +} + +impl Display for PublicationConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "table_names: {:?}", self.table_names) + } +} + #[derive(Serialize)] pub struct CreateTenantRequest { pub name: String, @@ -274,6 +285,47 @@ pub struct UpdatePipelineRequest { pub config: PipelineConfig, } +#[derive(Serialize)] +pub struct CreatePublicationRequest { + pub source_id: i64, + pub config: PublicationConfig, +} + +#[derive(Deserialize)] +pub struct CreatePublicationResponse { + pub id: i64, +} + +impl Display for CreatePublicationResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "id: {}", self.id) + } +} + +#[derive(Deserialize)] +pub struct PublicationResponse { + pub id: i64, + pub tenant_id: i64, + pub source_id: i64, + pub config: PublicationConfig, +} + +impl Display for PublicationResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "tenant_id: {}, id: {}, source_id: {}, config: {}", + self.tenant_id, self.id, self.source_id, self.config + ) + } +} + +#[derive(Serialize)] +pub struct UpdatePublicationRequest { + pub source_id: i64, + pub config: PublicationConfig, +} + #[derive(Debug, Error)] pub enum ApiClientError { #[error("reqwest error: {0}")] @@ -559,4 +611,88 @@ impl ApiClient { .json() .await?) } + + pub async fn create_publication( + &self, + tenant_id: i64, + publication: &CreatePublicationRequest, + ) -> Result { + Ok(self + .client + .post(&format!("{}/v1/publications", &self.address)) + .header("tenant_id", tenant_id) + .json(publication) + .send() + .await? + .json() + .await?) + } + + pub async fn read_publication( + &self, + tenant_id: i64, + publication_id: i64, + ) -> Result { + Ok(self + .client + .get(&format!( + "{}/v1/publications/{publication_id}", + &self.address + )) + .header("tenant_id", tenant_id) + .send() + .await? + .json() + .await?) + } + + pub async fn update_publication( + &self, + tenant_id: i64, + publication_id: i64, + publication: &UpdatePublicationRequest, + ) -> Result<(), ApiClientError> { + self.client + .post(&format!( + "{}/v1/publications/{publication_id}", + &self.address + )) + .header("tenant_id", tenant_id) + .json(publication) + .send() + .await?; + + Ok(()) + } + + pub async fn delete_publication( + &self, + tenant_id: i64, + publication_id: i64, + ) -> Result<(), ApiClientError> { + self.client + .delete(&format!( + "{}/v1/publications/{publication_id}", + &self.address + )) + .header("tenant_id", tenant_id) + .send() + .await?; + + Ok(()) + } + + pub async fn read_all_publications( + &self, + tenant_id: i64, + ) -> Result, ApiClientError> { + Ok(self + .client + .get(&format!("{}/v1/publications", &self.address)) + .header("tenant_id", tenant_id) + .send() + .await? + .json() + .await?) + } } diff --git a/cli/src/commands.rs b/cli/src/commands.rs index f1f164e..2b35208 100644 --- a/cli/src/commands.rs +++ b/cli/src/commands.rs @@ -4,6 +4,10 @@ use thiserror::Error; use crate::{ api_client::ApiClient, pipelines::{create_pipeline, delete_pipeline, list_pipelines, show_pipeline, update_pipeline}, + publications::{ + create_publication, delete_publication, list_publications, show_publication, + update_publication, + }, sinks::{create_sink, delete_sink, list_sinks, show_sink, update_sink}, sources::{create_source, delete_source, list_sources, show_source, update_source}, tenants::{create_tenant, delete_tenant, list_tenants, show_tenant, update_tenant}, @@ -49,6 +53,7 @@ pub enum SubCommand { Sources, Sinks, Pipelines, + Publications, } impl TryFrom<&str> for SubCommand { @@ -59,8 +64,11 @@ impl TryFrom<&str> for SubCommand { "t" | "te" | "ten" | "tena" | "tenan" | "tenant" | "tenants" => SubCommand::Tenants, "so" | "sou" | "sour" | "sourc" | "source" | "sources" => SubCommand::Sources, "si" | "sin" | "sink" | "sinks" => SubCommand::Sinks, - "p" | "pi" | "pip" | "pipe" | "pipel" | "pipeli" | "pipelin" | "pipeline" - | "pipelines" => SubCommand::Pipelines, + "pi" | "pip" | "pipe" | "pipel" | "pipeli" | "pipelin" | "pipeline" | "pipelines" => { + SubCommand::Pipelines + } + "pu" | "pub" | "publ" | "public" | "publica" | "publicat" | "publicati" + | "publicatio" | "publication" | "publications" => SubCommand::Publications, subcommand => { return Err(SubCommandParseError::InvalidSubCommand( subcommand.to_string(), @@ -109,6 +117,16 @@ pub async fn execute_commands( println!("error creating pipeline: {e:?}"); } }, + (Command::Add, SubCommand::Publications) => { + match create_publication(api_client, editor).await { + Ok(publication) => { + println!("publication created: {publication}"); + } + Err(e) => { + println!("error creating publication: {e:?}"); + } + } + } (Command::Update, SubCommand::Tenants) => match update_tenant(api_client, editor).await { Ok(()) => println!("tenant updated"), Err(e) => { @@ -134,6 +152,14 @@ pub async fn execute_commands( println!("error updating pipeline: {e:?}"); } }, + (Command::Update, SubCommand::Publications) => { + match update_publication(api_client, editor).await { + Ok(()) => println!("publication updated"), + Err(e) => { + println!("error updating publication: {e:?}"); + } + } + } (Command::Delete, SubCommand::Tenants) => match delete_tenant(api_client, editor).await { Ok(()) => println!("tenant deleted"), Err(e) => { @@ -159,6 +185,14 @@ pub async fn execute_commands( println!("error deleting pipeline: {e:?}"); } }, + (Command::Delete, SubCommand::Publications) => { + match delete_publication(api_client, editor).await { + Ok(()) => println!("publication deleted"), + Err(e) => { + println!("error deleting publication: {e:?}"); + } + } + } (Command::Show, SubCommand::Tenants) => match show_tenant(api_client, editor).await { Ok(tenant) => { println!("tenant: {tenant}") @@ -191,6 +225,16 @@ pub async fn execute_commands( println!("error reading pipeline: {e:?}"); } }, + (Command::Show, SubCommand::Publications) => { + match show_publication(api_client, editor).await { + Ok(publication) => { + println!("publication: {publication}") + } + Err(e) => { + println!("error reading publication: {e:?}"); + } + } + } (Command::List, SubCommand::Tenants) => match list_tenants(api_client).await { Ok(tenants) => { println!("tenants: "); @@ -235,5 +279,18 @@ pub async fn execute_commands( println!("error reading pipelines: {e:?}"); } }, + (Command::List, SubCommand::Publications) => { + match list_publications(api_client, editor).await { + Ok(publications) => { + println!("publications: "); + for publication in publications { + println!(" {publication}") + } + } + Err(e) => { + println!("error reading publications: {e:?}"); + } + } + } } } diff --git a/cli/src/main.rs b/cli/src/main.rs index d7ceaf4..2ae1ef9 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -9,6 +9,7 @@ use thiserror::Error; mod api_client; mod commands; mod pipelines; +mod publications; mod sinks; mod sources; mod tenants; diff --git a/cli/src/publications.rs b/cli/src/publications.rs new file mode 100644 index 0000000..639f1c8 --- /dev/null +++ b/cli/src/publications.rs @@ -0,0 +1,94 @@ +use rustyline::DefaultEditor; + +use crate::{ + api_client::{ + ApiClient, CreatePublicationRequest, CreatePublicationResponse, PublicationConfig, + PublicationResponse, UpdatePublicationRequest, + }, + get_id, get_string, + sources::get_source_id, + tenants::get_tenant_id, + CliError, +}; + +pub async fn create_publication( + api_client: &ApiClient, + editor: &mut DefaultEditor, +) -> Result { + let tenant_id = get_tenant_id(editor)?; + let source_id = get_source_id(editor)?; + let config = get_publication_config(editor)?; + let publication = api_client + .create_publication(tenant_id, &CreatePublicationRequest { source_id, config }) + .await?; + + Ok(publication) +} + +pub async fn show_publication( + api_client: &ApiClient, + editor: &mut DefaultEditor, +) -> Result { + let tenant_id = get_tenant_id(editor)?; + let publication_id = get_publication_id(editor)?; + + let publication = api_client + .read_publication(tenant_id, publication_id) + .await?; + + Ok(publication) +} + +pub async fn update_publication( + api_client: &ApiClient, + editor: &mut DefaultEditor, +) -> Result<(), CliError> { + let tenant_id = get_tenant_id(editor)?; + let source_id = get_source_id(editor)?; + let publication_id = get_publication_id(editor)?; + let config = get_publication_config(editor)?; + + let publication = UpdatePublicationRequest { source_id, config }; + api_client + .update_publication(tenant_id, publication_id, &publication) + .await?; + + Ok(()) +} + +pub async fn delete_publication( + api_client: &ApiClient, + editor: &mut DefaultEditor, +) -> Result<(), CliError> { + let tenant_id = get_tenant_id(editor)?; + let publication_id = get_publication_id(editor)?; + + api_client + .delete_publication(tenant_id, publication_id) + .await?; + + Ok(()) +} + +pub async fn list_publications( + api_client: &ApiClient, + editor: &mut DefaultEditor, +) -> Result, CliError> { + let tenant_id = get_tenant_id(editor)?; + let tenants = api_client.read_all_publications(tenant_id).await?; + + Ok(tenants) +} + +fn get_publication_config(editor: &mut DefaultEditor) -> Result { + let table_names = get_string(editor, "enter comma separated table names: ")?; + let table_names: Vec = table_names + .split(',') + .map(|table_name| table_name.trim().to_string()) + .collect(); + Ok(PublicationConfig { table_names }) +} + +pub fn get_publication_id(editor: &mut DefaultEditor) -> Result { + get_id(editor, "enter publication id: ") +}