diff --git a/Cargo.lock b/Cargo.lock index b9842dada3..6d5936f345 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2558,7 +2558,7 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" -version = "0.25.1" +version = "0.26.0" dependencies = [ "anyhow", "async-trait", @@ -2835,7 +2835,7 @@ dependencies = [ [[package]] name = "fluvio-sc-schema" -version = "0.21.2" +version = "0.22.0" dependencies = [ "anyhow", "fluvio-controlplane-metadata", diff --git a/Cargo.toml b/Cargo.toml index 2c171fa499..feade258e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -172,12 +172,12 @@ fluvio-cli-common = { path = "crates/fluvio-cli-common"} fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" } fluvio-connector-package = { path = "crates/fluvio-connector-package/" } fluvio-controlplane = { path = "crates/fluvio-controlplane" } -fluvio-controlplane-metadata = { version = "0.25.0", default-features = false, path = "crates/fluvio-controlplane-metadata" } +fluvio-controlplane-metadata = { version = "0.26.0", default-features = false, path = "crates/fluvio-controlplane-metadata" } fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false } fluvio-hub-util = { path = "crates/fluvio-hub-util" } fluvio-package-index = { version = "0.7.0", path = "crates/fluvio-package-index", default-features = false } fluvio-protocol = { version = "0.10.6", path = "crates/fluvio-protocol" } -fluvio-sc-schema = { version = "0.21.0", path = "crates/fluvio-sc-schema", default-features = false } +fluvio-sc-schema = { version = "0.22.0", path = "crates/fluvio-sc-schema", default-features = false } fluvio-service = { path = "crates/fluvio-service" } fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false } fluvio-smartmodule = { version = "0.7.0", path = "crates/fluvio-smartmodule", default-features = false } diff --git a/crates/fluvio-cli/src/client/partition/list.rs b/crates/fluvio-cli/src/client/partition/list.rs index c1c5d881ad..065435072f 100644 --- a/crates/fluvio-cli/src/client/partition/list.rs +++ b/crates/fluvio-cli/src/client/partition/list.rs @@ -9,6 +9,7 @@ use anyhow::Result; use fluvio::Fluvio; use fluvio::metadata::partition::*; +use fluvio_sc_schema::objects::ListRequest; use crate::common::output::Terminal; use crate::common::OutputFormat; @@ -18,6 +19,9 @@ use crate::common::OutputFormat; pub struct ListPartitionOpt { #[clap(flatten)] output: OutputFormat, + /// Show system partitions only + #[arg(long, short, required = false)] + system: bool, } impl ListPartitionOpt { @@ -29,7 +33,9 @@ impl ListPartitionOpt { let output = self.output.format; let admin = fluvio.admin().await; - let partitions = admin.all::().await?; + let partitions = admin + .list_with_config::(ListRequest::default().system(self.system)) + .await?; // format and dump to screen display::format_partition_response_output(out, partitions, output)?; diff --git a/crates/fluvio-cli/src/client/topic/list.rs b/crates/fluvio-cli/src/client/topic/list.rs index 0af1e6188b..000ac24ba8 100644 --- a/crates/fluvio-cli/src/client/topic/list.rs +++ b/crates/fluvio-cli/src/client/topic/list.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use clap::Parser; +use fluvio_sc_schema::objects::ListRequest; use tracing::debug; use anyhow::Result; @@ -25,6 +26,9 @@ pub struct ListTopicsOpt { /// Output #[clap(flatten)] output: OutputFormat, + /// Show system topics only + #[arg(long, short, required = false)] + system: bool, } impl ListTopicsOpt { @@ -33,7 +37,9 @@ impl ListTopicsOpt { debug!("list topics {:#?} ", output_type); let admin = fluvio.admin().await; - let topics = admin.all::().await?; + let topics = admin + .list_with_config::(ListRequest::default().system(self.system)) + .await?; display::format_response_output(out, topics, output_type)?; Ok(()) } @@ -43,7 +49,7 @@ mod display { use std::time::Duration; - use humantime::{format_duration}; + use humantime::format_duration; use comfy_table::{Row, Cell, CellAlignment}; use serde::Serialize; diff --git a/crates/fluvio-controlplane-metadata/Cargo.toml b/crates/fluvio-controlplane-metadata/Cargo.toml index c61c88973c..8262c8ee22 100644 --- a/crates/fluvio-controlplane-metadata/Cargo.toml +++ b/crates/fluvio-controlplane-metadata/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-controlplane-metadata" edition = "2021" -version = "0.25.1" +version = "0.26.0" authors = ["Fluvio Contributors "] description = "Metadata definition for Fluvio control plane" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-controlplane-metadata/src/partition/spec.rs b/crates/fluvio-controlplane-metadata/src/partition/spec.rs index 317db7e8a4..5d29660502 100644 --- a/crates/fluvio-controlplane-metadata/src/partition/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/partition/spec.rs @@ -30,6 +30,9 @@ pub struct PartitionSpec { pub compression_type: CompressionAlgorithm, #[fluvio(min_version = 12)] pub deduplication: Option, + #[cfg_attr(feature = "use_serde", serde(default))] + #[fluvio(min_version = 13)] + pub system: bool, } impl PartitionSpec { @@ -52,6 +55,7 @@ impl PartitionSpec { storage: topic.get_storage().cloned(), compression_type: topic.get_compression_type().clone(), deduplication: topic.get_deduplication().cloned(), + system: topic.is_system(), } } diff --git a/crates/fluvio-controlplane-metadata/src/topic/spec.rs b/crates/fluvio-controlplane-metadata/src/topic/spec.rs index 0b9028214a..559dbf09e5 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/spec.rs @@ -30,6 +30,9 @@ pub struct TopicSpec { #[cfg_attr(feature = "use_serde", serde(default))] #[fluvio(min_version = 12)] deduplication: Option, + #[cfg_attr(feature = "use_serde", serde(default))] + #[fluvio(min_version = 13)] + system: bool, } impl From for TopicSpec { @@ -109,6 +112,14 @@ impl TopicSpec { self.deduplication = deduplication; } + pub fn is_system(&self) -> bool { + self.system + } + + pub fn set_system(&mut self, system: bool) { + self.system = system; + } + /// get retention secs that can be displayed pub fn retention_secs(&self) -> u32 { self.get_clean_policy() diff --git a/crates/fluvio-sc-schema/Cargo.toml b/crates/fluvio-sc-schema/Cargo.toml index 5f78fc6885..83a7aa3b25 100644 --- a/crates/fluvio-sc-schema/Cargo.toml +++ b/crates/fluvio-sc-schema/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-sc-schema" -version = "0.21.2" +version = "0.22.0" edition = "2021" authors = ["Fluvio Contributors "] description = "Fluvio API for SC" diff --git a/crates/fluvio-sc-schema/src/objects/list.rs b/crates/fluvio-sc-schema/src/objects/list.rs index 7b97bc2641..c8e367bc69 100644 --- a/crates/fluvio-sc-schema/src/objects/list.rs +++ b/crates/fluvio-sc-schema/src/objects/list.rs @@ -68,6 +68,8 @@ pub struct ListRequest { pub name_filters: ListFilters, #[fluvio(min_version = 10)] pub summary: bool, // if true, only return summary + #[fluvio(min_version = 13)] + pub system: bool, // if true, only return system specs data: PhantomData, // satisfy generic } @@ -76,9 +78,15 @@ impl ListRequest { Self { name_filters: name_filters.into(), summary, + system: false, data: PhantomData, } } + + pub fn system(mut self, system: bool) -> Self { + self.system = system; + self + } } #[derive(Debug, Default, Encoder)] diff --git a/crates/fluvio-sc-schema/src/objects/mod.rs b/crates/fluvio-sc-schema/src/objects/mod.rs index 91515d3341..adddfb5507 100644 --- a/crates/fluvio-sc-schema/src/objects/mod.rs +++ b/crates/fluvio-sc-schema/src/objects/mod.rs @@ -13,7 +13,7 @@ pub use list::*; pub use watch::*; pub use metadata::*; -pub(crate) const COMMON_VERSION: i16 = 12; // from now, we use a single version for all objects +pub(crate) const COMMON_VERSION: i16 = 13; // from now, we use a single version for all objects pub(crate) const DYN_OBJ: i16 = 11; // version indicate dynamic object #[cfg(test)] diff --git a/crates/fluvio-sc/src/services/public_api/list.rs b/crates/fluvio-sc/src/services/public_api/list.rs index 9e59776f1d..f9ed2f8e31 100644 --- a/crates/fluvio-sc/src/services/public_api/list.rs +++ b/crates/fluvio-sc/src/services/public_api/list.rs @@ -30,7 +30,8 @@ pub async fn handle_list_request( let response = if let Some(req) = req.downcast()? as Option> { ObjectApiListResponse::try_encode_from( - super::topic::handle_fetch_topics_request(req.name_filters, auth_ctx).await?, + super::topic::handle_fetch_topics_request(req.name_filters, req.system, auth_ctx) + .await?, header.api_version(), )? } else if let Some(req) = req.downcast()? as Option> { @@ -50,7 +51,7 @@ pub async fn handle_list_request( )? } else if let Some(req) = req.downcast()? as Option> { ObjectApiListResponse::try_encode_from( - super::partition::handle_fetch_request(req.name_filters, auth_ctx).await?, + super::partition::handle_fetch_request(req.name_filters, req.system, auth_ctx).await?, header.api_version(), )? } else if let Some(req) = req.downcast()? as Option> { diff --git a/crates/fluvio-sc/src/services/public_api/partition/mod.rs b/crates/fluvio-sc/src/services/public_api/partition/mod.rs index ed44d1c19c..0da0e8b442 100644 --- a/crates/fluvio-sc/src/services/public_api/partition/mod.rs +++ b/crates/fluvio-sc/src/services/public_api/partition/mod.rs @@ -14,6 +14,7 @@ use crate::services::auth::AuthServiceContext; #[instrument(skip(_filters, auth_ctx))] pub async fn handle_fetch_request( _filters: ListFilters, + system: bool, auth_ctx: &AuthServiceContext, ) -> Result> { debug!("fetching custom spu list"); @@ -38,6 +39,7 @@ pub async fn handle_fetch_request( .read() .await .values() + .filter(|value| value.inner().spec().system == system) .map(|value| value.inner().clone().into()) .collect(); diff --git a/crates/fluvio-sc/src/services/public_api/topic/fetch.rs b/crates/fluvio-sc/src/services/public_api/topic/fetch.rs index 7353bd8cfa..09c0d20038 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/fetch.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/fetch.rs @@ -13,6 +13,7 @@ use crate::services::auth::AuthServiceContext; #[instrument(skip(filters, auth_ctx))] pub async fn handle_fetch_topics_request( filters: ListFilters, + system: bool, auth_ctx: &AuthServiceContext, ) -> Result> { debug!("retrieving topic list: {:#?}", filters); @@ -37,6 +38,7 @@ pub async fn handle_fetch_topics_request( .read() .await .values() + .filter(|value| value.inner().spec().is_system() == system) .filter_map(|value| { if filters.filter(value.key()) { Some(value.inner().clone().into()) diff --git a/crates/fluvio/src/admin.rs b/crates/fluvio/src/admin.rs index c66adc00ac..451ed6c68e 100644 --- a/crates/fluvio/src/admin.rs +++ b/crates/fluvio/src/admin.rs @@ -249,8 +249,18 @@ impl FluvioAdmin { let filter_list: Vec = filters.into_iter().map(Into::into).collect(); let list_request: ListRequest = ListRequest::new(filter_list, summary); + self.list_with_config(list_request).await + } + + #[instrument(skip(self, config))] + pub async fn list_with_config(&self, config: ListRequest) -> Result>> + where + S: AdminSpec, + ListFilter: From, + S::Status: Encoder + Decoder + Debug, + { let response = self - .send_receive_admin::(list_request) + .send_receive_admin::(config) .await?; trace!("list response: {:#?}", response); response diff --git a/crates/fluvio/src/consumer/mod.rs b/crates/fluvio/src/consumer/mod.rs index 8fafa1bb82..ec625f9edd 100644 --- a/crates/fluvio/src/consumer/mod.rs +++ b/crates/fluvio/src/consumer/mod.rs @@ -445,7 +445,7 @@ where }; } Err(err) => { - error!("stream to server channel broken: {err:?}"); + debug!("stream to server channel closed: {err:?}"); break; } } diff --git a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml index b89de6ae1f..170ca10d3b 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml @@ -81,6 +81,8 @@ spec: age: type: string nullable: true + system: + type: boolean status: type: object x-kubernetes-preserve-unknown-fields: true diff --git a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml index 0605b65c1a..3a91048086 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml @@ -111,7 +111,9 @@ spec: minimum: 0 age: type: string - nullable: true + nullable: true + system: + type: boolean subresources: status: {} additionalPrinterColumns: