diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 2c1da548a505..04341b5ab03b 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -310,14 +310,14 @@ pub enum Error { location: Location, }, - #[snafu(display("Cannot find requested database: {}-{}", catalog, schema))] + #[snafu(display("Cannot find requested database: {}.{}", catalog, schema))] DatabaseNotFound { catalog: String, schema: String, location: Location, }, - #[snafu(display("Cannot find requested table: {}-{}-{}", catalog, schema, table))] + #[snafu(display("Cannot find requested table: {}.{}.{}", catalog, schema, table))] TableNotFound { catalog: String, schema: String, diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index c6440cc6504d..5209af11f065 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -30,6 +30,7 @@ use common_version::BuildInfo; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; use datatypes::vectors::{Float64Vector, StringVector}; +use futures::StreamExt; use promql_parser::label::METRIC_NAME; use promql_parser::parser::{ AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, @@ -49,7 +50,7 @@ use crate::error::{ UnexpectedResultSnafu, }; use crate::http::header::collect_plan_metrics; -use crate::prom_store::METRIC_NAME_LABEL; +use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL}; use crate::prometheus_handler::PrometheusHandlerRef; /// For [ValueType::Vector] result type @@ -690,6 +691,23 @@ pub async fn label_values_query( }; table_names.sort_unstable(); return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names)); + } else if label_name == FIELD_NAME_LABEL { + let field_columns = + match retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0) + .await + { + Ok(table_names) => table_names, + Err(e) => { + return PrometheusJsonResponse::error( + e.status_code().to_string(), + e.output_msg(), + ); + } + }; + + return PrometheusJsonResponse::success(PrometheusResponse::LabelValues( + field_columns.into_iter().collect(), + )); } let queries = params.matches.0; @@ -742,6 +760,44 @@ pub async fn label_values_query( resp } +async fn retrieve_field_names( + query_ctx: &QueryContext, + manager: CatalogManagerRef, + matches: Vec, +) -> Result> { + let mut field_columns = HashSet::new(); + let catalog = query_ctx.current_catalog(); + let schema = query_ctx.current_schema(); + + if matches.is_empty() { + // query all tables if no matcher is provided + while let Some(table) = manager.tables(catalog, schema).await.next().await { + let table = table.context(CatalogSnafu)?; + for column in table.field_columns() { + field_columns.insert(column.name); + } + } + return Ok(field_columns); + } + + for table_name in matches { + let table = manager + .table(catalog, schema, &table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + catalog: catalog.to_string(), + schema: schema.to_string(), + table: table_name.to_string(), + })?; + + for column in table.field_columns() { + field_columns.insert(column.name); + } + } + Ok(field_columns) +} + async fn retrieve_label_values( result: Result, label_name: &str, diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 85db9178c956..fc08b921a0a3 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -42,6 +42,9 @@ pub const METRIC_NAME_LABEL: &str = "__name__"; pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__"; +/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate +pub const FIELD_NAME_LABEL: &str = "__field__"; + /// Metrics for push gateway protocol pub struct Metrics { pub exposition: MetricsExposition, diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 44406c24b239..ae07a905b92f 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use common_query::logical_plan::Expr; @@ -90,4 +91,25 @@ impl Table { .iter() .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone()) } + + /// Get field columns in the definition order. + pub fn field_columns(&self) -> impl Iterator + '_ { + // `value_indices` in TableMeta is not reliable. Do a filter here. + let primary_keys = self + .table_info + .meta + .primary_key_indices + .iter() + .copied() + .collect::>(); + + self.table_info + .meta + .schema + .column_schemas() + .iter() + .enumerate() + .filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index()) + .map(|(_, c)| c.clone()) + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 1f2febc68576..7ffd4dc4c6a4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -544,6 +544,19 @@ pub async fn test_prom_http_api(store_type: StorageType) { serde_json::from_value::(json!(["host1", "host2"])).unwrap() ); + // search field name + let res = client + .get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["cpu", "memory"])).unwrap() + ); + // query an empty database should return nothing let res = client .get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=600")