Skip to content

Commit

Permalink
feat: added hidden topics and partitions (#3930)
Browse files Browse the repository at this point in the history
* feat: added hidden topics and partitions

* filter on sc, renamed to 'system'

* reduce log severity if stream channel closed
  • Loading branch information
galibey authored Apr 5, 2024
1 parent d5a8d15 commit cf550ac
Show file tree
Hide file tree
Showing 17 changed files with 69 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ fluvio-cli-common = { path = "crates/fluvio-cli-common"}
fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" }
fluvio-connector-package = { path = "crates/fluvio-connector-package/" }
fluvio-controlplane = { path = "crates/fluvio-controlplane" }
fluvio-controlplane-metadata = { version = "0.25.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-controlplane-metadata = { version = "0.26.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false }
fluvio-hub-util = { path = "crates/fluvio-hub-util" }
fluvio-package-index = { version = "0.7.0", path = "crates/fluvio-package-index", default-features = false }
fluvio-protocol = { version = "0.10.6", path = "crates/fluvio-protocol" }
fluvio-sc-schema = { version = "0.21.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-sc-schema = { version = "0.22.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-service = { path = "crates/fluvio-service" }
fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false }
fluvio-smartmodule = { version = "0.7.0", path = "crates/fluvio-smartmodule", default-features = false }
Expand Down
8 changes: 7 additions & 1 deletion crates/fluvio-cli/src/client/partition/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use anyhow::Result;

use fluvio::Fluvio;
use fluvio::metadata::partition::*;
use fluvio_sc_schema::objects::ListRequest;

use crate::common::output::Terminal;
use crate::common::OutputFormat;
Expand All @@ -18,6 +19,9 @@ use crate::common::OutputFormat;
pub struct ListPartitionOpt {
#[clap(flatten)]
output: OutputFormat,
/// Show system partitions only
#[arg(long, short, required = false)]
system: bool,
}

impl ListPartitionOpt {
Expand All @@ -29,7 +33,9 @@ impl ListPartitionOpt {
let output = self.output.format;
let admin = fluvio.admin().await;

let partitions = admin.all::<PartitionSpec>().await?;
let partitions = admin
.list_with_config::<PartitionSpec, String>(ListRequest::default().system(self.system))
.await?;

// format and dump to screen
display::format_partition_response_output(out, partitions, output)?;
Expand Down
10 changes: 8 additions & 2 deletions crates/fluvio-cli/src/client/topic/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::sync::Arc;

use clap::Parser;
use fluvio_sc_schema::objects::ListRequest;
use tracing::debug;
use anyhow::Result;

Expand All @@ -25,6 +26,9 @@ pub struct ListTopicsOpt {
/// Output
#[clap(flatten)]
output: OutputFormat,
/// Show system topics only
#[arg(long, short, required = false)]
system: bool,
}

impl ListTopicsOpt {
Expand All @@ -33,7 +37,9 @@ impl ListTopicsOpt {
debug!("list topics {:#?} ", output_type);
let admin = fluvio.admin().await;

let topics = admin.all::<TopicSpec>().await?;
let topics = admin
.list_with_config::<TopicSpec, String>(ListRequest::default().system(self.system))
.await?;
display::format_response_output(out, topics, output_type)?;
Ok(())
}
Expand All @@ -43,7 +49,7 @@ mod display {

use std::time::Duration;

use humantime::{format_duration};
use humantime::format_duration;
use comfy_table::{Row, Cell, CellAlignment};
use serde::Serialize;

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-controlplane-metadata"
edition = "2021"
version = "0.25.1"
version = "0.26.0"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Metadata definition for Fluvio control plane"
repository = "https://github.com/infinyon/fluvio"
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-controlplane-metadata/src/partition/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub struct PartitionSpec {
pub compression_type: CompressionAlgorithm,
#[fluvio(min_version = 12)]
pub deduplication: Option<Deduplication>,
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 13)]
pub system: bool,
}

impl PartitionSpec {
Expand All @@ -52,6 +55,7 @@ impl PartitionSpec {
storage: topic.get_storage().cloned(),
compression_type: topic.get_compression_type().clone(),
deduplication: topic.get_deduplication().cloned(),
system: topic.is_system(),
}
}

Expand Down
11 changes: 11 additions & 0 deletions crates/fluvio-controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub struct TopicSpec {
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 12)]
deduplication: Option<Deduplication>,
#[cfg_attr(feature = "use_serde", serde(default))]
#[fluvio(min_version = 13)]
system: bool,
}

impl From<ReplicaSpec> for TopicSpec {
Expand Down Expand Up @@ -109,6 +112,14 @@ impl TopicSpec {
self.deduplication = deduplication;
}

pub fn is_system(&self) -> bool {
self.system
}

pub fn set_system(&mut self, system: bool) {
self.system = system;
}

/// get retention secs that can be displayed
pub fn retention_secs(&self) -> u32 {
self.get_clean_policy()
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-sc-schema"
version = "0.21.2"
version = "0.22.0"
edition = "2021"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio API for SC"
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio-sc-schema/src/objects/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct ListRequest<S> {
pub name_filters: ListFilters,
#[fluvio(min_version = 10)]
pub summary: bool, // if true, only return summary
#[fluvio(min_version = 13)]
pub system: bool, // if true, only return system specs
data: PhantomData<S>, // satisfy generic
}

Expand All @@ -76,9 +78,15 @@ impl<S> ListRequest<S> {
Self {
name_filters: name_filters.into(),
summary,
system: false,
data: PhantomData,
}
}

pub fn system(mut self, system: bool) -> Self {
self.system = system;
self
}
}

#[derive(Debug, Default, Encoder)]
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/src/objects/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use list::*;
pub use watch::*;
pub use metadata::*;

pub(crate) const COMMON_VERSION: i16 = 12; // from now, we use a single version for all objects
pub(crate) const COMMON_VERSION: i16 = 13; // from now, we use a single version for all objects
pub(crate) const DYN_OBJ: i16 = 11; // version indicate dynamic object

#[cfg(test)]
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-sc/src/services/public_api/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub async fn handle_list_request<AC: AuthContext, C: MetadataItem>(

let response = if let Some(req) = req.downcast()? as Option<ListRequest<TopicSpec>> {
ObjectApiListResponse::try_encode_from(
super::topic::handle_fetch_topics_request(req.name_filters, auth_ctx).await?,
super::topic::handle_fetch_topics_request(req.name_filters, req.system, auth_ctx)
.await?,
header.api_version(),
)?
} else if let Some(req) = req.downcast()? as Option<ListRequest<SpuSpec>> {
Expand All @@ -50,7 +51,7 @@ pub async fn handle_list_request<AC: AuthContext, C: MetadataItem>(
)?
} else if let Some(req) = req.downcast()? as Option<ListRequest<PartitionSpec>> {
ObjectApiListResponse::try_encode_from(
super::partition::handle_fetch_request(req.name_filters, auth_ctx).await?,
super::partition::handle_fetch_request(req.name_filters, req.system, auth_ctx).await?,
header.api_version(),
)?
} else if let Some(req) = req.downcast()? as Option<ListRequest<SmartModuleSpec>> {
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-sc/src/services/public_api/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::services::auth::AuthServiceContext;
#[instrument(skip(_filters, auth_ctx))]
pub async fn handle_fetch_request<AC: AuthContext, C: MetadataItem>(
_filters: ListFilters,
system: bool,
auth_ctx: &AuthServiceContext<AC, C>,
) -> Result<ListResponse<PartitionSpec>> {
debug!("fetching custom spu list");
Expand All @@ -38,6 +39,7 @@ pub async fn handle_fetch_request<AC: AuthContext, C: MetadataItem>(
.read()
.await
.values()
.filter(|value| value.inner().spec().system == system)
.map(|value| value.inner().clone().into())
.collect();

Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-sc/src/services/public_api/topic/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::services::auth::AuthServiceContext;
#[instrument(skip(filters, auth_ctx))]
pub async fn handle_fetch_topics_request<AC: AuthContext, C: MetadataItem>(
filters: ListFilters,
system: bool,
auth_ctx: &AuthServiceContext<AC, C>,
) -> Result<ListResponse<TopicSpec>> {
debug!("retrieving topic list: {:#?}", filters);
Expand All @@ -37,6 +38,7 @@ pub async fn handle_fetch_topics_request<AC: AuthContext, C: MetadataItem>(
.read()
.await
.values()
.filter(|value| value.inner().spec().is_system() == system)
.filter_map(|value| {
if filters.filter(value.key()) {
Some(value.inner().clone().into())
Expand Down
12 changes: 11 additions & 1 deletion crates/fluvio/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,18 @@ impl FluvioAdmin {
let filter_list: Vec<ListFilter> = filters.into_iter().map(Into::into).collect();
let list_request: ListRequest<S> = ListRequest::new(filter_list, summary);

self.list_with_config(list_request).await
}

#[instrument(skip(self, config))]
pub async fn list_with_config<S, F>(&self, config: ListRequest<S>) -> Result<Vec<Metadata<S>>>
where
S: AdminSpec,
ListFilter: From<F>,
S::Status: Encoder + Decoder + Debug,
{
let response = self
.send_receive_admin::<ObjectApiListRequest, _>(list_request)
.send_receive_admin::<ObjectApiListRequest, _>(config)
.await?;
trace!("list response: {:#?}", response);
response
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ where
};
}
Err(err) => {
error!("stream to server channel broken: {err:?}");
debug!("stream to server channel closed: {err:?}");
break;
}
}
Expand Down
2 changes: 2 additions & 0 deletions k8-util/helm/fluvio-sys/templates/crd_partition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ spec:
age:
type: string
nullable: true
system:
type: boolean
status:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
4 changes: 3 additions & 1 deletion k8-util/helm/fluvio-sys/templates/crd_topic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ spec:
minimum: 0
age:
type: string
nullable: true
nullable: true
system:
type: boolean
subresources:
status: {}
additionalPrinterColumns:
Expand Down

0 comments on commit cf550ac

Please sign in to comment.