Skip to content

Commit

Permalink
feat: add flow mem size to sys table
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Nov 20, 2024
1 parent 3633f25 commit a7fc29a
Show file tree
Hide file tree
Showing 34 changed files with 985 additions and 41 deletions.
128 changes: 128 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scripts/check-snafu.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def main():
]

for name in unused_snafu:
print(name)
print("Unused snafu variant name:", name)

if unused_snafu:
raise SystemExit(1)
Expand Down
8 changes: 8 additions & 0 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display("Failed to list flow stats"))]
ListFlowStats {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to list flows in catalog {catalog}"))]
ListFlows {
#[snafu(implicit)]
Expand Down Expand Up @@ -326,6 +333,7 @@ impl ErrorExt for Error {
| Error::ListSchemas { source, .. }
| Error::ListTables { source, .. }
| Error::ListFlows { source, .. }
| Error::ListFlowStats { source, .. }
| Error::ListProcedures { source, .. }
| Error::ListRegionStats { source, .. }
| Error::ConvertProtoData { source, .. } => source.status_code(),
Expand Down
9 changes: 9 additions & 0 deletions src/catalog/src/system_schema/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME
use common_error::ext::ErrorExt;
use common_meta::cluster::NodeInfo;
use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream;
Expand Down Expand Up @@ -192,6 +193,7 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
)) as _),
FLOWS => Some(Arc::new(InformationSchemaFlows::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.flow_metadata_manager.clone(),
)) as _),
PROCEDURE_INFO => Some(
Expand Down Expand Up @@ -338,6 +340,9 @@ pub trait InformationExtension {

/// Gets the region statistics.
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;

/// Get the flow statistics. If no flownode is available, return `None`.
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
}

pub struct NoopInformationExtension;
Expand All @@ -357,4 +362,8 @@ impl InformationExtension for NoopInformationExtension {
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
Ok(vec![])
}

async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(None)
}
}
Loading

0 comments on commit a7fc29a

Please sign in to comment.