From ec97e6c4a7aa9b3494f03c5f5bef8240d349d293 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 11 Sep 2024 12:35:46 +0530 Subject: [PATCH] remove cli crate --- Cargo.toml | 2 +- cli/Cargo.toml | 12 - cli/src/api_client.rs | 672 ---------------------------------------- cli/src/commands.rs | 240 -------------- cli/src/main.rs | 151 --------- cli/src/pipelines.rs | 110 ------- cli/src/publications.rs | 7 - cli/src/sinks.rs | 87 ------ cli/src/sources.rs | 102 ------ cli/src/tenants.rs | 70 ----- 10 files changed, 1 insertion(+), 1452 deletions(-) delete mode 100644 cli/Cargo.toml delete mode 100644 cli/src/api_client.rs delete mode 100644 cli/src/commands.rs delete mode 100644 cli/src/main.rs delete mode 100644 cli/src/pipelines.rs delete mode 100644 cli/src/publications.rs delete mode 100644 cli/src/sinks.rs delete mode 100644 cli/src/sources.rs delete mode 100644 cli/src/tenants.rs diff --git a/Cargo.toml b/Cargo.toml index 691efae..82281f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ resolver = "2" -members = ["api", "pg_replicate", "replicator", "cli"] +members = ["api", "pg_replicate", "replicator"] [workspace.dependencies] actix-web = { version = "4", default-features = false } diff --git a/cli/Cargo.toml b/cli/Cargo.toml deleted file mode 100644 index d1d88a1..0000000 --- a/cli/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "cli" -version = "0.1.0" -edition = "2021" - -[dependencies] -reqwest = { workspace = true, features = ["json"] } -rustyline = { workspace = true } -serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true, features = ["std"] } -thiserror = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/cli/src/api_client.rs b/cli/src/api_client.rs deleted file mode 100644 index 4c2c67e..0000000 --- a/cli/src/api_client.rs +++ /dev/null @@ -1,672 +0,0 @@ -use std::fmt::{Display, Formatter}; - -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -pub struct ApiClient { - pub address: String, - pub client: reqwest::Client, -} - -#[derive(serde::Serialize, serde::Deserialize)] -pub enum SourceConfig { - Postgres { - /// Host on which Postgres is running - host: String, - - /// Port on which Postgres is running - port: u16, - - /// Postgres database name - name: String, - - /// Postgres database user name - username: String, - - /// Postgres database user password - password: Option, - - /// Postgres slot name - slot_name: String, - }, -} - -impl Display for SourceConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let SourceConfig::Postgres { - host, - port, - name, - username, - password, - slot_name, - } = self; - write!( - f, - "host: {host}, port: {port}, name: {name}, username: {username}, password: {password:?}, slot_name: {slot_name}", - ) - } -} - -#[derive(serde::Serialize, serde::Deserialize)] -pub enum SinkConfig { - BigQuery { - /// BigQuery project id - project_id: String, - - /// BigQuery dataset id - dataset_id: String, - - /// BigQuery service account key - service_account_key: String, - }, -} - -impl Display for SinkConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let SinkConfig::BigQuery { - project_id, - dataset_id, - service_account_key, - } = self; - write!(f, "project_id: {project_id}, dataset_id: {dataset_id}, service_account_key: {service_account_key}") - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct PipelineConfig { - pub config: BatchConfig, -} - -impl Display for PipelineConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "config: {}", self.config) - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct BatchConfig { - /// maximum batch size in number of events - pub max_size: usize, - - /// maximum duration, in seconds, to wait for a batch to fill - pub max_fill_secs: u64, -} - -impl Display for BatchConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "max_size: {}, max_fill_secs: {}", - self.max_size, self.max_fill_secs - ) - } -} - -#[derive(Serialize)] -pub struct CreateTenantRequest { - pub name: String, -} - -#[derive(Serialize)] -pub struct UpdateTenantRequest { - pub name: String, -} - -#[derive(Deserialize)] -pub struct CreateTenantResponse { - pub id: i64, -} - -impl Display for CreateTenantResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "id: {}", self.id) - } -} - -#[derive(Deserialize)] -pub struct TenantResponse { - pub id: i64, - pub name: String, -} - -impl Display for TenantResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "id: {}, name: {}", self.id, self.name,) - } -} - -#[derive(Serialize)] -pub struct CreateSourceRequest { - pub config: SourceConfig, -} - -#[derive(Deserialize)] -pub struct CreateSourceResponse { - pub id: i64, -} - -impl Display for CreateSourceResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "id: {}", self.id) - } -} - -#[derive(Serialize)] -pub struct UpdateSourceRequest { - pub config: SourceConfig, -} - -#[derive(Deserialize)] -pub struct SourceResponse { - pub id: i64, - pub tenant_id: i64, - pub config: SourceConfig, -} - -impl Display for SourceResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "tenant_id: {}, id: {}, config: {}", - self.tenant_id, self.id, self.config - ) - } -} - -#[derive(Serialize)] -pub struct CreateSinkRequest { - pub config: SinkConfig, -} - -#[derive(Deserialize)] -pub struct CreateSinkResponse { - pub id: i64, -} - -impl Display for CreateSinkResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "id: {}", self.id) - } -} - -#[derive(Serialize)] -pub struct UpdateSinkRequest { - pub config: SinkConfig, -} - -#[derive(Deserialize)] -pub struct SinkResponse { - pub id: i64, - pub tenant_id: i64, - pub config: SinkConfig, -} - -impl Display for SinkResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "tenant_id: {}, id: {}, config: {}", - self.tenant_id, self.id, self.config - ) - } -} - -#[derive(Serialize)] -pub struct CreatePipelineRequest { - pub source_id: i64, - pub sink_id: i64, - pub publication_name: String, - pub config: PipelineConfig, -} - -#[derive(Deserialize)] -pub struct CreatePipelineResponse { - pub id: i64, -} - -impl Display for CreatePipelineResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "id: {}", self.id) - } -} - -#[derive(Deserialize)] -pub struct PipelineResponse { - pub id: i64, - pub tenant_id: i64, - pub source_id: i64, - pub sink_id: i64, - pub publication_name: String, - pub config: PipelineConfig, -} - -impl Display for PipelineResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "tenant_id: {}, id: {}, source_id: {}, sink_id: {}, publication_name: {}, config: {}", - self.tenant_id, - self.id, - self.source_id, - self.sink_id, - self.publication_name, - self.config - ) - } -} - -#[derive(Serialize)] -pub struct UpdatePipelineRequest { - pub source_id: i64, - pub sink_id: i64, - pub publication_name: String, - pub config: PipelineConfig, -} - -#[derive(Debug, Error)] -pub enum ApiClientError { - #[error("reqwest error: {0}")] - Reqwest(#[from] reqwest::Error), - - #[error("api error: {0}")] - ApiError(String), -} - -#[derive(Deserialize)] -pub struct ErrorMessage { - pub error: String, -} - -impl ApiClient { - pub fn new(address: String) -> ApiClient { - let client = reqwest::Client::new(); - ApiClient { address, client } - } - - pub async fn create_tenant( - &self, - tenant: &CreateTenantRequest, - ) -> Result { - let response = self - .client - .post(format!("{}/v1/tenants", &self.address)) - .json(tenant) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_tenant(&self, tenant_id: i64) -> Result { - let response = self - .client - .get(format!("{}/v1/tenants/{tenant_id}", &self.address)) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn update_tenant( - &self, - tenant_id: i64, - tenant: &UpdateTenantRequest, - ) -> Result<(), ApiClientError> { - let response = self - .client - .post(format!("{}/v1/tenants/{tenant_id}", &self.address)) - .json(tenant) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn delete_tenant(&self, tenant_id: i64) -> Result<(), ApiClientError> { - let response = self - .client - .delete(format!("{}/v1/tenants/{tenant_id}", &self.address)) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_all_tenants(&self) -> Result, ApiClientError> { - let response = self - .client - .get(format!("{}/v1/tenants", &self.address)) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn create_source( - &self, - tenant_id: i64, - source: &CreateSourceRequest, - ) -> Result { - let response = self - .client - .post(format!("{}/v1/sources", &self.address)) - .header("tenant_id", tenant_id) - .json(source) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_source( - &self, - tenant_id: i64, - source_id: i64, - ) -> Result { - let response = self - .client - .get(format!("{}/v1/sources/{source_id}", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn update_source( - &self, - tenant_id: i64, - source_id: i64, - source: &UpdateSourceRequest, - ) -> Result<(), ApiClientError> { - let response = self - .client - .post(format!("{}/v1/sources/{source_id}", &self.address)) - .header("tenant_id", tenant_id) - .json(source) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn delete_source( - &self, - tenant_id: i64, - source_id: i64, - ) -> Result<(), ApiClientError> { - let response = self - .client - .delete(format!("{}/v1/sources/{source_id}", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_all_sources( - &self, - tenant_id: i64, - ) -> Result, ApiClientError> { - let response = self - .client - .get(format!("{}/v1/sources", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn create_sink( - &self, - tenant_id: i64, - sink: &CreateSinkRequest, - ) -> Result { - let response = self - .client - .post(format!("{}/v1/sinks", &self.address)) - .header("tenant_id", tenant_id) - .json(sink) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_sink( - &self, - tenant_id: i64, - sink_id: i64, - ) -> Result { - let response = self - .client - .get(format!("{}/v1/sinks/{sink_id}", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn update_sink( - &self, - tenant_id: i64, - sink_id: i64, - sink: &UpdateSinkRequest, - ) -> Result<(), ApiClientError> { - let response = self - .client - .post(format!("{}/v1/sinks/{sink_id}", &self.address)) - .header("tenant_id", tenant_id) - .json(sink) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn delete_sink(&self, tenant_id: i64, sink_id: i64) -> Result<(), ApiClientError> { - let response = self - .client - .delete(format!("{}/v1/sinks/{sink_id}", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_all_sinks( - &self, - tenant_id: i64, - ) -> Result, ApiClientError> { - let response = self - .client - .get(format!("{}/v1/sinks", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn create_pipeline( - &self, - tenant_id: i64, - pipeline: &CreatePipelineRequest, - ) -> Result { - let response = self - .client - .post(format!("{}/v1/pipelines", &self.address)) - .header("tenant_id", tenant_id) - .json(pipeline) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_pipeline( - &self, - tenant_id: i64, - pipeline_id: i64, - ) -> Result { - let response = self - .client - .get(format!("{}/v1/pipelines/{pipeline_id}", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn update_pipeline( - &self, - tenant_id: i64, - pipeline_id: i64, - pipeline: &UpdatePipelineRequest, - ) -> Result<(), ApiClientError> { - let response = self - .client - .post(format!("{}/v1/pipelines/{pipeline_id}", &self.address)) - .header("tenant_id", tenant_id) - .json(pipeline) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn delete_pipeline( - &self, - tenant_id: i64, - pipeline_id: i64, - ) -> Result<(), ApiClientError> { - let response = self - .client - .delete(format!("{}/v1/pipelines/{pipeline_id}", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(()) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } - - pub async fn read_all_pipelines( - &self, - tenant_id: i64, - ) -> Result, ApiClientError> { - let response = self - .client - .get(format!("{}/v1/pipelines", &self.address)) - .header("tenant_id", tenant_id) - .send() - .await?; - - if response.status().is_success() { - Ok(response.json().await?) - } else { - let message: ErrorMessage = response.json().await?; - Err(ApiClientError::ApiError(message.error)) - } - } -} diff --git a/cli/src/commands.rs b/cli/src/commands.rs deleted file mode 100644 index 7067364..0000000 --- a/cli/src/commands.rs +++ /dev/null @@ -1,240 +0,0 @@ -use rustyline::DefaultEditor; -use thiserror::Error; - -use crate::{ - api_client::ApiClient, - pipelines::{create_pipeline, delete_pipeline, list_pipelines, show_pipeline, update_pipeline}, - sinks::{create_sink, delete_sink, list_sinks, show_sink, update_sink}, - sources::{create_source, delete_source, list_sources, show_source, update_source}, - tenants::{create_tenant, delete_tenant, list_tenants, show_tenant, update_tenant}, -}; - -pub enum Command { - Add, - Update, - Delete, - Show, - List, -} - -#[derive(Debug, Error)] -pub enum CommandParseError { - #[error("invalid command: {0}")] - InvalidCommand(String), -} - -impl TryFrom<&str> for Command { - type Error = CommandParseError; - - fn try_from(command: &str) -> Result { - Ok(match command { - "a" | "ad" | "add" | "c" | "cr" | "cre" | "crea" | "creat" | "create" => Command::Add, - "u" | "up" | "upd" | "upda" | "updat" | "update" => Command::Update, - "d" | "de" | "del" | "dele" | "delet" | "delete" => Command::Delete, - "s" | "sh" | "sho" | "show" => Command::Show, - "l" | "li" | "lis" | "list" => Command::List, - command => return Err(CommandParseError::InvalidCommand(command.to_string())), - }) - } -} - -#[derive(Debug, Error)] -pub enum SubCommandParseError { - #[error("invalid subcommand: {0}")] - InvalidSubCommand(String), -} - -pub enum SubCommand { - Tenants, - Sources, - Sinks, - Pipelines, -} - -impl TryFrom<&str> for SubCommand { - type Error = SubCommandParseError; - - fn try_from(subcommand: &str) -> Result { - Ok(match subcommand { - "t" | "te" | "ten" | "tena" | "tenan" | "tenant" | "tenants" => SubCommand::Tenants, - "so" | "sou" | "sour" | "sourc" | "source" | "sources" => SubCommand::Sources, - "si" | "sin" | "sink" | "sinks" => SubCommand::Sinks, - "pi" | "pip" | "pipe" | "pipel" | "pipeli" | "pipelin" | "pipeline" | "pipelines" => { - SubCommand::Pipelines - } - subcommand => { - return Err(SubCommandParseError::InvalidSubCommand( - subcommand.to_string(), - )) - } - }) - } -} - -pub async fn execute_commands( - command: Command, - subcommand: SubCommand, - api_client: &ApiClient, - editor: &mut DefaultEditor, -) { - match (command, subcommand) { - (Command::Add, SubCommand::Tenants) => match create_tenant(api_client, editor).await { - Ok(tenant) => { - println!("tenant created: {tenant}"); - } - Err(e) => { - println!("error creating tenant: {e:?}"); - } - }, - (Command::Add, SubCommand::Sources) => match create_source(api_client, editor).await { - Ok(source) => { - println!("source created: {source}"); - } - Err(e) => { - println!("error creating source: {e:?}"); - } - }, - (Command::Add, SubCommand::Sinks) => match create_sink(api_client, editor).await { - Ok(sink) => { - println!("sink created: {sink}"); - } - Err(e) => { - println!("error creating sink: {e:?}"); - } - }, - (Command::Add, SubCommand::Pipelines) => match create_pipeline(api_client, editor).await { - Ok(pipeline) => { - println!("pipeline created: {pipeline}"); - } - Err(e) => { - println!("error creating pipeline: {e:?}"); - } - }, - (Command::Update, SubCommand::Tenants) => match update_tenant(api_client, editor).await { - Ok(()) => println!("tenant updated"), - Err(e) => { - println!("error updating tenant: {e:?}"); - } - }, - (Command::Update, SubCommand::Sources) => match update_source(api_client, editor).await { - Ok(()) => println!("source updated"), - Err(e) => { - println!("error updating source: {e:?}"); - } - }, - (Command::Update, SubCommand::Sinks) => match update_sink(api_client, editor).await { - Ok(()) => println!("sink updated"), - Err(e) => { - println!("error updating sink: {e:?}"); - } - }, - (Command::Update, SubCommand::Pipelines) => match update_pipeline(api_client, editor).await - { - Ok(()) => println!("pipeline updated"), - Err(e) => { - println!("error updating pipeline: {e:?}"); - } - }, - (Command::Delete, SubCommand::Tenants) => match delete_tenant(api_client, editor).await { - Ok(()) => println!("tenant deleted"), - Err(e) => { - println!("error deleting tenant: {e:?}"); - } - }, - (Command::Delete, SubCommand::Sources) => match delete_source(api_client, editor).await { - Ok(()) => println!("source deleted"), - Err(e) => { - println!("error deleting source: {e:?}"); - } - }, - (Command::Delete, SubCommand::Sinks) => match delete_sink(api_client, editor).await { - Ok(()) => println!("sink deleted"), - Err(e) => { - println!("error deleting sink: {e:?}"); - } - }, - (Command::Delete, SubCommand::Pipelines) => match delete_pipeline(api_client, editor).await - { - Ok(()) => println!("pipeline deleted"), - Err(e) => { - println!("error deleting pipeline: {e:?}"); - } - }, - (Command::Show, SubCommand::Tenants) => match show_tenant(api_client, editor).await { - Ok(tenant) => { - println!("tenant: {tenant}") - } - Err(e) => { - println!("error reading tenant: {e:?}"); - } - }, - (Command::Show, SubCommand::Sources) => match show_source(api_client, editor).await { - Ok(source) => { - println!("source: {source}") - } - Err(e) => { - println!("error reading source: {e:?}"); - } - }, - (Command::Show, SubCommand::Sinks) => match show_sink(api_client, editor).await { - Ok(sink) => { - println!("sink: {sink}") - } - Err(e) => { - println!("error reading sink: {e:?}"); - } - }, - (Command::Show, SubCommand::Pipelines) => match show_pipeline(api_client, editor).await { - Ok(pipeline) => { - println!("pipeline: {pipeline}") - } - Err(e) => { - println!("error reading pipeline: {e:?}"); - } - }, - (Command::List, SubCommand::Tenants) => match list_tenants(api_client).await { - Ok(tenants) => { - println!("tenants: "); - for tenant in tenants { - println!(" {tenant}") - } - } - Err(e) => { - println!("error reading tenant: {e:?}"); - } - }, - (Command::List, SubCommand::Sources) => match list_sources(api_client, editor).await { - Ok(sources) => { - println!("sources: "); - for source in sources { - println!(" {source}") - } - } - Err(e) => { - println!("error reading source: {e:?}"); - } - }, - (Command::List, SubCommand::Sinks) => match list_sinks(api_client, editor).await { - Ok(sinks) => { - println!("sinks: "); - for sink in sinks { - println!(" {sink}") - } - } - Err(e) => { - println!("error reading sink: {e:?}"); - } - }, - (Command::List, SubCommand::Pipelines) => match list_pipelines(api_client, editor).await { - Ok(pipelines) => { - println!("pipelines: "); - for pipeline in pipelines { - println!(" {pipeline}") - } - } - Err(e) => { - println!("error reading pipelines: {e:?}"); - } - }, - } -} diff --git a/cli/src/main.rs b/cli/src/main.rs deleted file mode 100644 index ae1b479..0000000 --- a/cli/src/main.rs +++ /dev/null @@ -1,151 +0,0 @@ -use std::num::ParseIntError; - -use api_client::{ApiClient, ApiClientError}; -use commands::{execute_commands, Command, SubCommand}; -use rustyline::error::ReadlineError; -use rustyline::DefaultEditor; -use thiserror::Error; - -mod api_client; -mod commands; -mod pipelines; -mod publications; -mod sinks; -mod sources; -mod tenants; - -#[derive(Debug, Error)] -pub enum CliError { - #[error("readline error: {0}")] - Readline(#[from] ReadlineError), - - #[error("api client error: {0}")] - ApiClient(#[from] ApiClientError), - - #[error("parse int error: {0}")] - ParseInt(#[from] ParseIntError), -} - -#[tokio::main] -async fn main() -> rustyline::Result<()> { - let address = "http://127.0.0.1:8000".to_string(); - let api_client = ApiClient::new(address); - let mut editor = DefaultEditor::new()?; - println!("replicator api CLI version 0.1.0"); - println!("type help or ? for help"); - println!(); - loop { - let readline = editor.readline("> "); - match readline { - Ok(line) => match line.to_lowercase().as_str() { - "quit" | "exit" => { - break; - } - "help" | "?" => { - print_help(); - } - command => { - let command = command.trim(); - let tokens: Vec<&str> = command.split_whitespace().collect(); - if tokens.len() != 2 { - print_invalid_command_help(command); - continue; - } - let main_command = tokens[0].trim().to_lowercase(); - let command: Command = match main_command.as_str().try_into() { - Ok(command) => command, - Err(e) => { - println!("error parsing command: {e:?}"); - continue; - } - }; - let subcommand = tokens[1].trim().to_lowercase(); - let subcommand: SubCommand = match subcommand.as_str().try_into() { - Ok(subcommand) => subcommand, - Err(e) => { - println!("error parsing subcommand: {e:?}"); - continue; - } - }; - execute_commands(command, subcommand, &api_client, &mut editor).await; - } - }, - Err(ReadlineError::Interrupted) => { - break; - } - Err(ReadlineError::Eof) => { - break; - } - Err(err) => { - eprintln!("Error: {:?}", err); - break; - } - } - } - Ok(()) -} - -fn print_invalid_command_help(command: &str) { - println!("invalid command: {command}"); - println!("type help or ? to get help with commands"); - println!(); -} - -fn print_help() { - println!("supported commands:"); - println!(); - println!(" add tenant - add a new tenant"); - println!(" add source - add a new source"); - println!(" add sink - add a new sink"); - println!(" add pipeline - add an new pipeline"); - println!(" add publication - add an new publication"); - println!(); - println!(" update tenant - update an existing tenant"); - println!(" update source - update an existing source"); - println!(" update sink - update an existing sink"); - println!(" update pipeline - update a existing pipeline"); - println!(" update publication - update a existing publication"); - println!(); - println!(" delete tenant - delete an existing tenant"); - println!(" delete source - delete an existing source"); - println!(" delete sink - delete an existing sink"); - println!(" delete pipeline - delete an existing pipeline"); - println!(" delete publication - delete an existing publication"); - println!(); - println!(" show tenant - show an existing tenant"); - println!(" show source - show an existing source"); - println!(" show sink - show an existing sink"); - println!(" show pipeline - show an existing pipeline"); - println!(" show publication - show an existing publication"); - println!(); - println!(" list tenants - list all existing tenants"); - println!(" list sources - list all existing sources"); - println!(" list sinks - list all existing sinks"); - println!(" list pipelines - list all existing pipelines"); - println!(" list publications - list all existing publications"); - println!(); -} - -pub fn get_string(editor: &mut DefaultEditor, prompt: &str) -> Result { - let s = editor.readline(prompt)?; - let s = s.trim().to_string(); - Ok(s) -} - -pub fn get_id(editor: &mut DefaultEditor, prompt: &str) -> Result { - let id = get_string(editor, prompt)?; - let id = id.trim().parse()?; - Ok(id) -} - -pub fn get_usize(editor: &mut DefaultEditor, prompt: &str) -> Result { - let usize = get_string(editor, prompt)?; - let usize = usize.trim().parse()?; - Ok(usize) -} - -pub fn get_u64(editor: &mut DefaultEditor, prompt: &str) -> Result { - let u64 = get_string(editor, prompt)?; - let u64 = u64.trim().parse()?; - Ok(u64) -} diff --git a/cli/src/pipelines.rs b/cli/src/pipelines.rs deleted file mode 100644 index 570d3f9..0000000 --- a/cli/src/pipelines.rs +++ /dev/null @@ -1,110 +0,0 @@ -use rustyline::DefaultEditor; - -use crate::{ - api_client::{ - ApiClient, BatchConfig, CreatePipelineRequest, CreatePipelineResponse, PipelineConfig, - PipelineResponse, UpdatePipelineRequest, - }, - get_id, get_u64, get_usize, - publications::get_publication_name, - sinks::get_sink_id, - sources::get_source_id, - tenants::get_tenant_id, - CliError, -}; - -pub async fn create_pipeline( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - let source_id = get_source_id(editor)?; - let sink_id = get_sink_id(editor)?; - let publication_name = get_publication_name(editor)?; - let config = get_pipeline_config(editor)?; - let pipeline = api_client - .create_pipeline( - tenant_id, - &CreatePipelineRequest { - source_id, - sink_id, - publication_name, - config, - }, - ) - .await?; - - Ok(pipeline) -} - -pub async fn show_pipeline( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - let pipeline_id = get_pipeline_id(editor)?; - - let pipeline = api_client.read_pipeline(tenant_id, pipeline_id).await?; - - Ok(pipeline) -} - -pub async fn update_pipeline( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let source_id = get_source_id(editor)?; - let sink_id = get_sink_id(editor)?; - let publication_name = get_publication_name(editor)?; - let pipeline_id = get_pipeline_id(editor)?; - let config = get_pipeline_config(editor)?; - - let pipeline = UpdatePipelineRequest { - source_id, - sink_id, - publication_name, - config, - }; - api_client - .update_pipeline(tenant_id, pipeline_id, &pipeline) - .await?; - - Ok(()) -} - -pub async fn delete_pipeline( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let pipeline_id = get_pipeline_id(editor)?; - - api_client.delete_pipeline(tenant_id, pipeline_id).await?; - - Ok(()) -} - -pub async fn list_pipelines( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result, CliError> { - let tenant_id = get_tenant_id(editor)?; - let tenants = api_client.read_all_pipelines(tenant_id).await?; - - Ok(tenants) -} - -fn get_pipeline_config(editor: &mut DefaultEditor) -> Result { - let max_size = get_usize(editor, "enter max batch size: ")?; - let max_fill_secs = get_u64(editor, "enter max batch fill seconds: ")?; - let config = BatchConfig { - max_size, - max_fill_secs, - }; - Ok(PipelineConfig { config }) -} - -pub fn get_pipeline_id(editor: &mut DefaultEditor) -> Result { - get_id(editor, "enter pipeline id: ") -} diff --git a/cli/src/publications.rs b/cli/src/publications.rs deleted file mode 100644 index d7eac94..0000000 --- a/cli/src/publications.rs +++ /dev/null @@ -1,7 +0,0 @@ -use rustyline::DefaultEditor; - -use crate::{get_string, CliError}; - -pub fn get_publication_name(editor: &mut DefaultEditor) -> Result { - get_string(editor, "enter publication anme: ") -} diff --git a/cli/src/sinks.rs b/cli/src/sinks.rs deleted file mode 100644 index 7e960d4..0000000 --- a/cli/src/sinks.rs +++ /dev/null @@ -1,87 +0,0 @@ -use rustyline::DefaultEditor; - -use crate::{ - api_client::{ - ApiClient, CreateSinkRequest, CreateSinkResponse, SinkConfig, SinkResponse, - UpdateSinkRequest, - }, - get_id, get_string, - tenants::get_tenant_id, - CliError, -}; - -pub async fn create_sink( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - let config = get_sink_config(editor)?; - let sink = api_client - .create_sink(tenant_id, &CreateSinkRequest { config }) - .await?; - - Ok(sink) -} - -pub async fn show_sink( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - let sink_id = get_sink_id(editor)?; - - let sink = api_client.read_sink(tenant_id, sink_id).await?; - - Ok(sink) -} - -pub async fn update_sink( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let sink_id = get_sink_id(editor)?; - let config = get_sink_config(editor)?; - - let sink = UpdateSinkRequest { config }; - api_client.update_sink(tenant_id, sink_id, &sink).await?; - - Ok(()) -} - -pub async fn delete_sink( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let sink_id = get_sink_id(editor)?; - - api_client.delete_sink(tenant_id, sink_id).await?; - - Ok(()) -} - -pub async fn list_sinks( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result, CliError> { - let tenant_id = get_tenant_id(editor)?; - let tenants = api_client.read_all_sinks(tenant_id).await?; - - Ok(tenants) -} - -fn get_sink_config(editor: &mut DefaultEditor) -> Result { - let project_id = get_string(editor, "enter project_id: ")?; - let dataset_id = get_string(editor, "enter dataset_id: ")?; - let service_account_key = get_string(editor, "enter service_account_key: ")?; - Ok(SinkConfig::BigQuery { - project_id, - dataset_id, - service_account_key, - }) -} - -pub fn get_sink_id(editor: &mut DefaultEditor) -> Result { - get_id(editor, "enter sink id: ") -} diff --git a/cli/src/sources.rs b/cli/src/sources.rs deleted file mode 100644 index 0ecb78d..0000000 --- a/cli/src/sources.rs +++ /dev/null @@ -1,102 +0,0 @@ -use rustyline::DefaultEditor; - -use crate::{ - api_client::{ - ApiClient, CreateSourceRequest, CreateSourceResponse, SourceConfig, SourceResponse, - UpdateSourceRequest, - }, - get_id, get_string, - tenants::get_tenant_id, - CliError, -}; - -pub async fn create_source( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - let config = get_source_config(editor)?; - let source = api_client - .create_source(tenant_id, &CreateSourceRequest { config }) - .await?; - - Ok(source) -} - -pub async fn show_source( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - let source_id = get_source_id(editor)?; - - let source = api_client.read_source(tenant_id, source_id).await?; - - Ok(source) -} - -pub async fn update_source( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let source_id = get_source_id(editor)?; - let config = get_source_config(editor)?; - - let source = UpdateSourceRequest { config }; - api_client - .update_source(tenant_id, source_id, &source) - .await?; - - Ok(()) -} - -pub async fn delete_source( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let source_id = get_source_id(editor)?; - - api_client.delete_source(tenant_id, source_id).await?; - - Ok(()) -} - -pub async fn list_sources( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result, CliError> { - let tenant_id = get_tenant_id(editor)?; - let tenants = api_client.read_all_sources(tenant_id).await?; - - Ok(tenants) -} - -fn get_source_config(editor: &mut DefaultEditor) -> Result { - let host = get_string(editor, "enter host: ")?; - let port = get_string(editor, "enter port: ")?; - let port: u16 = port.parse()?; - let name = get_string(editor, "enter database name: ")?; - let username = get_string(editor, "enter username: ")?; - let password = get_string(editor, "enter password: ")?; - let password = if password.is_empty() { - None - } else { - Some(password) - }; - let slot_name = get_string(editor, "enter slot name: ")?; - - Ok(SourceConfig::Postgres { - host, - port, - name, - username, - password, - slot_name, - }) -} - -pub fn get_source_id(editor: &mut DefaultEditor) -> Result { - get_id(editor, "enter source id: ") -} diff --git a/cli/src/tenants.rs b/cli/src/tenants.rs deleted file mode 100644 index bc9727c..0000000 --- a/cli/src/tenants.rs +++ /dev/null @@ -1,70 +0,0 @@ -use rustyline::DefaultEditor; - -use crate::{ - api_client::{ - ApiClient, CreateTenantRequest, CreateTenantResponse, TenantResponse, UpdateTenantRequest, - }, - get_id, get_string, CliError, -}; - -pub async fn create_tenant( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let name = get_tenant_name(editor)?; - - let tenant = api_client - .create_tenant(&CreateTenantRequest { name }) - .await?; - - Ok(tenant) -} - -pub async fn show_tenant( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result { - let tenant_id = get_tenant_id(editor)?; - - let tenant = api_client.read_tenant(tenant_id).await?; - - Ok(tenant) -} - -pub async fn update_tenant( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - let name = get_tenant_name(editor)?; - - let tenant = UpdateTenantRequest { name }; - api_client.update_tenant(tenant_id, &tenant).await?; - - Ok(()) -} - -pub async fn delete_tenant( - api_client: &ApiClient, - editor: &mut DefaultEditor, -) -> Result<(), CliError> { - let tenant_id = get_tenant_id(editor)?; - - api_client.delete_tenant(tenant_id).await?; - - Ok(()) -} - -pub async fn list_tenants(api_client: &ApiClient) -> Result, CliError> { - let tenants = api_client.read_all_tenants().await?; - - Ok(tenants) -} - -fn get_tenant_name(editor: &mut DefaultEditor) -> Result { - get_string(editor, "enter tenant name: ") -} - -pub fn get_tenant_id(editor: &mut DefaultEditor) -> Result { - get_id(editor, "enter tenant id: ") -}