Skip to content

Commit

Permalink
add publication support to cli
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 29, 2024
1 parent 8f792ca commit c85d148
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 2 deletions.
136 changes: 136 additions & 0 deletions cli/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ impl Display for BatchConfig {
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct PublicationConfig {
pub table_names: Vec<String>,
}

impl Display for PublicationConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "table_names: {:?}", self.table_names)
}
}

#[derive(Serialize)]
pub struct CreateTenantRequest {
pub name: String,
Expand Down Expand Up @@ -274,6 +285,47 @@ pub struct UpdatePipelineRequest {
pub config: PipelineConfig,
}

#[derive(Serialize)]
pub struct CreatePublicationRequest {
pub source_id: i64,
pub config: PublicationConfig,
}

#[derive(Deserialize)]
pub struct CreatePublicationResponse {
pub id: i64,
}

impl Display for CreatePublicationResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "id: {}", self.id)
}
}

#[derive(Deserialize)]
pub struct PublicationResponse {
pub id: i64,
pub tenant_id: i64,
pub source_id: i64,
pub config: PublicationConfig,
}

impl Display for PublicationResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"tenant_id: {}, id: {}, source_id: {}, config: {}",
self.tenant_id, self.id, self.source_id, self.config
)
}
}

#[derive(Serialize)]
pub struct UpdatePublicationRequest {
pub source_id: i64,
pub config: PublicationConfig,
}

#[derive(Debug, Error)]
pub enum ApiClientError {
#[error("reqwest error: {0}")]
Expand Down Expand Up @@ -559,4 +611,88 @@ impl ApiClient {
.json()
.await?)
}

pub async fn create_publication(
&self,
tenant_id: i64,
publication: &CreatePublicationRequest,
) -> Result<CreatePublicationResponse, ApiClientError> {
Ok(self
.client
.post(&format!("{}/v1/publications", &self.address))
.header("tenant_id", tenant_id)
.json(publication)
.send()
.await?
.json()
.await?)
}

pub async fn read_publication(
&self,
tenant_id: i64,
publication_id: i64,
) -> Result<PublicationResponse, ApiClientError> {
Ok(self
.client
.get(&format!(
"{}/v1/publications/{publication_id}",
&self.address
))
.header("tenant_id", tenant_id)
.send()
.await?
.json()
.await?)
}

pub async fn update_publication(
&self,
tenant_id: i64,
publication_id: i64,
publication: &UpdatePublicationRequest,
) -> Result<(), ApiClientError> {
self.client
.post(&format!(
"{}/v1/publications/{publication_id}",
&self.address
))
.header("tenant_id", tenant_id)
.json(publication)
.send()
.await?;

Ok(())
}

pub async fn delete_publication(
&self,
tenant_id: i64,
publication_id: i64,
) -> Result<(), ApiClientError> {
self.client
.delete(&format!(
"{}/v1/publications/{publication_id}",
&self.address
))
.header("tenant_id", tenant_id)
.send()
.await?;

Ok(())
}

pub async fn read_all_publications(
&self,
tenant_id: i64,
) -> Result<Vec<PublicationResponse>, ApiClientError> {
Ok(self
.client
.get(&format!("{}/v1/publications", &self.address))
.header("tenant_id", tenant_id)
.send()
.await?
.json()
.await?)
}
}
61 changes: 59 additions & 2 deletions cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use thiserror::Error;
use crate::{
api_client::ApiClient,
pipelines::{create_pipeline, delete_pipeline, list_pipelines, show_pipeline, update_pipeline},
publications::{
create_publication, delete_publication, list_publications, show_publication,
update_publication,
},
sinks::{create_sink, delete_sink, list_sinks, show_sink, update_sink},
sources::{create_source, delete_source, list_sources, show_source, update_source},
tenants::{create_tenant, delete_tenant, list_tenants, show_tenant, update_tenant},
Expand Down Expand Up @@ -49,6 +53,7 @@ pub enum SubCommand {
Sources,
Sinks,
Pipelines,
Publications,
}

impl TryFrom<&str> for SubCommand {
Expand All @@ -59,8 +64,11 @@ impl TryFrom<&str> for SubCommand {
"t" | "te" | "ten" | "tena" | "tenan" | "tenant" | "tenants" => SubCommand::Tenants,
"so" | "sou" | "sour" | "sourc" | "source" | "sources" => SubCommand::Sources,
"si" | "sin" | "sink" | "sinks" => SubCommand::Sinks,
"p" | "pi" | "pip" | "pipe" | "pipel" | "pipeli" | "pipelin" | "pipeline"
| "pipelines" => SubCommand::Pipelines,
"pi" | "pip" | "pipe" | "pipel" | "pipeli" | "pipelin" | "pipeline" | "pipelines" => {
SubCommand::Pipelines
}
"pu" | "pub" | "publ" | "public" | "publica" | "publicat" | "publicati"
| "publicatio" | "publication" | "publications" => SubCommand::Publications,
subcommand => {
return Err(SubCommandParseError::InvalidSubCommand(
subcommand.to_string(),
Expand Down Expand Up @@ -109,6 +117,16 @@ pub async fn execute_commands(
println!("error creating pipeline: {e:?}");
}
},
(Command::Add, SubCommand::Publications) => {
match create_publication(api_client, editor).await {
Ok(publication) => {
println!("publication created: {publication}");
}
Err(e) => {
println!("error creating publication: {e:?}");
}
}
}
(Command::Update, SubCommand::Tenants) => match update_tenant(api_client, editor).await {
Ok(()) => println!("tenant updated"),
Err(e) => {
Expand All @@ -134,6 +152,14 @@ pub async fn execute_commands(
println!("error updating pipeline: {e:?}");
}
},
(Command::Update, SubCommand::Publications) => {
match update_publication(api_client, editor).await {
Ok(()) => println!("publication updated"),
Err(e) => {
println!("error updating publication: {e:?}");
}
}
}
(Command::Delete, SubCommand::Tenants) => match delete_tenant(api_client, editor).await {
Ok(()) => println!("tenant deleted"),
Err(e) => {
Expand All @@ -159,6 +185,14 @@ pub async fn execute_commands(
println!("error deleting pipeline: {e:?}");
}
},
(Command::Delete, SubCommand::Publications) => {
match delete_publication(api_client, editor).await {
Ok(()) => println!("publication deleted"),
Err(e) => {
println!("error deleting publication: {e:?}");
}
}
}
(Command::Show, SubCommand::Tenants) => match show_tenant(api_client, editor).await {
Ok(tenant) => {
println!("tenant: {tenant}")
Expand Down Expand Up @@ -191,6 +225,16 @@ pub async fn execute_commands(
println!("error reading pipeline: {e:?}");
}
},
(Command::Show, SubCommand::Publications) => {
match show_publication(api_client, editor).await {
Ok(publication) => {
println!("publication: {publication}")
}
Err(e) => {
println!("error reading publication: {e:?}");
}
}
}
(Command::List, SubCommand::Tenants) => match list_tenants(api_client).await {
Ok(tenants) => {
println!("tenants: ");
Expand Down Expand Up @@ -235,5 +279,18 @@ pub async fn execute_commands(
println!("error reading pipelines: {e:?}");
}
},
(Command::List, SubCommand::Publications) => {
match list_publications(api_client, editor).await {
Ok(publications) => {
println!("publications: ");
for publication in publications {
println!(" {publication}")
}
}
Err(e) => {
println!("error reading publications: {e:?}");
}
}
}
}
}
1 change: 1 addition & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use thiserror::Error;
mod api_client;
mod commands;
mod pipelines;
mod publications;
mod sinks;
mod sources;
mod tenants;
Expand Down
94 changes: 94 additions & 0 deletions cli/src/publications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use rustyline::DefaultEditor;

use crate::{
api_client::{
ApiClient, CreatePublicationRequest, CreatePublicationResponse, PublicationConfig,
PublicationResponse, UpdatePublicationRequest,
},
get_id, get_string,
sources::get_source_id,
tenants::get_tenant_id,
CliError,
};

pub async fn create_publication(
api_client: &ApiClient,
editor: &mut DefaultEditor,
) -> Result<CreatePublicationResponse, CliError> {
let tenant_id = get_tenant_id(editor)?;
let source_id = get_source_id(editor)?;
let config = get_publication_config(editor)?;
let publication = api_client
.create_publication(tenant_id, &CreatePublicationRequest { source_id, config })
.await?;

Ok(publication)
}

pub async fn show_publication(
api_client: &ApiClient,
editor: &mut DefaultEditor,
) -> Result<PublicationResponse, CliError> {
let tenant_id = get_tenant_id(editor)?;
let publication_id = get_publication_id(editor)?;

let publication = api_client
.read_publication(tenant_id, publication_id)
.await?;

Ok(publication)
}

pub async fn update_publication(
api_client: &ApiClient,
editor: &mut DefaultEditor,
) -> Result<(), CliError> {
let tenant_id = get_tenant_id(editor)?;
let source_id = get_source_id(editor)?;
let publication_id = get_publication_id(editor)?;
let config = get_publication_config(editor)?;

let publication = UpdatePublicationRequest { source_id, config };
api_client
.update_publication(tenant_id, publication_id, &publication)
.await?;

Ok(())
}

pub async fn delete_publication(
api_client: &ApiClient,
editor: &mut DefaultEditor,
) -> Result<(), CliError> {
let tenant_id = get_tenant_id(editor)?;
let publication_id = get_publication_id(editor)?;

api_client
.delete_publication(tenant_id, publication_id)
.await?;

Ok(())
}

pub async fn list_publications(
api_client: &ApiClient,
editor: &mut DefaultEditor,
) -> Result<Vec<PublicationResponse>, CliError> {
let tenant_id = get_tenant_id(editor)?;
let tenants = api_client.read_all_publications(tenant_id).await?;

Ok(tenants)
}

fn get_publication_config(editor: &mut DefaultEditor) -> Result<PublicationConfig, CliError> {
let table_names = get_string(editor, "enter comma separated table names: ")?;
let table_names: Vec<String> = table_names
.split(',')
.map(|table_name| table_name.trim().to_string())
.collect();
Ok(PublicationConfig { table_names })
}

pub fn get_publication_id(editor: &mut DefaultEditor) -> Result<i64, CliError> {
get_id(editor, "enter publication id: ")
}

0 comments on commit c85d148

Please sign in to comment.