diff --git a/Cargo.lock b/Cargo.lock index 82862de58ca0..418e6fc28bf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1455,6 +1455,7 @@ name = "client" version = "0.6.0" dependencies = [ "api", + "arc-swap", "arrow-flight", "async-stream", "async-trait", @@ -1915,6 +1916,7 @@ dependencies = [ name = "common-recordbatch" version = "0.6.0" dependencies = [ + "arc-swap", "common-error", "common-macro", "datafusion", @@ -3635,7 +3637,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2c1f17dce7af748c9a1255e82d6ceb7959f8919b#2c1f17dce7af748c9a1255e82d6ceb7959f8919b" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=65b008f018395f8fa917a7d3c7883b82f309cb74#65b008f018395f8fa917a7d3c7883b82f309cb74" dependencies = [ "prost 0.12.3", "serde", @@ -6914,6 +6916,8 @@ dependencies = [ "greptime-proto", "humantime", "lazy_static", + "meter-core", + "meter-macros", "num", "num-traits", "object-store", diff --git a/Cargo.toml b/Cargo.toml index 545434313131..41ba4e1d8754 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2c1f17dce7af748c9a1255e82d6ceb7959f8919b" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "65b008f018395f8fa917a7d3c7883b82f309cb74" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index c39de7c4881a..7abc4797816a 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -312,6 +312,7 @@ impl DataSource for InformationTableDataSource { schema: projected_schema, stream: Box::pin(stream), output_ordering: None, + metrics: Default::default(), }; Ok(Box::pin(stream)) diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 276ee43cf81b..d9f7feb945fa 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -9,6 +9,7 @@ testing = [] [dependencies] api.workspace = true +arc-swap = "1.6" arrow-flight.workspace = true async-stream.workspace = true async-trait.workspace = true diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 46fcf4bcec52..6d6ad9d609be 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -309,30 +309,36 @@ impl Database { ); Ok(Output::AffectedRows(rows)) } - FlightMessage::Recordbatch(_) => IllegalFlightMessagesSnafu { - reason: "The first flight message cannot be a RecordBatch message", + FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => { + IllegalFlightMessagesSnafu { + reason: "The first flight message cannot be a RecordBatch or Metrics message", + } + .fail() } - .fail(), FlightMessage::Schema(schema) => { let stream = Box::pin(stream!({ while let Some(flight_message) = flight_message_stream.next().await { let flight_message = flight_message .map_err(BoxedError::new) .context(ExternalSnafu)?; - let FlightMessage::Recordbatch(record_batch) = flight_message else { - yield IllegalFlightMessagesSnafu {reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"} + match flight_message { + FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch), + FlightMessage::Metrics(_) => {} + FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => { + yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)} .fail() .map_err(BoxedError::new) .context(ExternalSnafu); - break; - }; - yield Ok(record_batch); + break; + } + } } })); let record_batch_stream = RecordBatchStreamWrapper { schema, stream, output_ordering: None, + metrics: Default::default(), }; Ok(Output::Stream(Box::pin(record_batch_stream))) } diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 574af84228c6..7c05d9337230 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; use api::v1::ResponseHeader; +use arc_swap::ArcSwapOption; use arrow_flight::Ticket; use async_stream::stream; use async_trait::async_trait; @@ -119,27 +122,38 @@ impl RegionRequester { .fail(); }; + let metrics_str = Arc::new(ArcSwapOption::from(None)); + let ref_str = metrics_str.clone(); + let stream = Box::pin(stream!({ while let Some(flight_message) = flight_message_stream.next().await { let flight_message = flight_message .map_err(BoxedError::new) .context(ExternalSnafu)?; - let FlightMessage::Recordbatch(record_batch) = flight_message else { - yield IllegalFlightMessagesSnafu { + + match flight_message { + FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch), + FlightMessage::Metrics(s) => { + ref_str.swap(Some(Arc::new(s))); + break; + } + _ => { + yield IllegalFlightMessagesSnafu { reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages" } .fail() .map_err(BoxedError::new) .context(ExternalSnafu); - break; - }; - yield Ok(record_batch); + break; + } + } } })); let record_batch_stream = RecordBatchStreamWrapper { schema, stream, output_ordering: None, + metrics: metrics_str, }; Ok(Box::pin(record_batch_stream)) } diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 09f75b2c989d..cb8d97d5b107 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use api::v1::{AffectedRows, FlightMetadata}; +use api::v1::{AffectedRows, FlightMetadata, Metrics}; use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::{FlightData, SchemaAsIpc}; use common_base::bytes::Bytes; @@ -39,6 +39,7 @@ pub enum FlightMessage { Schema(SchemaRef), Recordbatch(RecordBatch), AffectedRows(usize), + Metrics(String), } pub struct FlightEncoder { @@ -85,6 +86,22 @@ impl FlightEncoder { FlightMessage::AffectedRows(rows) => { let metadata = FlightMetadata { affected_rows: Some(AffectedRows { value: rows as _ }), + metrics: None, + } + .encode_to_vec(); + FlightData { + flight_descriptor: None, + data_header: build_none_flight_msg().into(), + app_metadata: metadata.into(), + data_body: ProstBytes::default(), + } + } + FlightMessage::Metrics(s) => { + let metadata = FlightMetadata { + affected_rows: None, + metrics: Some(Metrics { + metrics: s.as_bytes().to_vec(), + }), } .encode_to_vec(); FlightData { @@ -119,6 +136,11 @@ impl FlightDecoder { if let Some(AffectedRows { value }) = metadata.affected_rows { return Ok(FlightMessage::AffectedRows(value as _)); } + if let Some(Metrics { metrics }) = metadata.metrics { + return Ok(FlightMessage::Metrics( + String::from_utf8_lossy(&metrics).to_string(), + )); + } InvalidFlightDataSnafu { reason: "Expecting FlightMetadata have some meaningful content.", } diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 896969bace09..ddfb1bfb6c1f 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -143,8 +143,12 @@ impl PhysicalPlan for PhysicalPlanAdapter { let stream = df_plan .execute(partition, context) .context(error::GeneralDataFusionSnafu)?; - let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric) - .context(error::ConvertDfRecordBatchStreamSnafu)?; + let adapter = RecordBatchStreamAdapter::try_new_with_metrics_and_df_plan( + stream, + baseline_metric, + df_plan, + ) + .context(error::ConvertDfRecordBatchStreamSnafu)?; Ok(Box::pin(adapter)) } diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index 5185dcadd45d..ed56a5b5cd96 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +arc-swap = "1.6" common-error.workspace = true common-macro.workspace = true datafusion-common.workspace = true @@ -13,8 +14,8 @@ datatypes.workspace = true futures.workspace = true paste = "1.0" serde.workspace = true +serde_json.workspace = true snafu.workspace = true [dev-dependencies] -serde_json = "1.0" tokio.workspace = true diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index b71bad410317..3cbb8eb4e2cd 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -21,8 +21,8 @@ use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream}; -use datafusion::physical_plan::metrics::BaselineMetrics; -use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream; +use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue}; +use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream as DfRecordBatchStream}; use datafusion_common::DataFusionError; use datatypes::schema::{Schema, SchemaRef}; use futures::ready; @@ -154,6 +154,15 @@ pub struct RecordBatchStreamAdapter { schema: SchemaRef, stream: DfSendableRecordBatchStream, metrics: Option, + /// Aggregated plan-level metrics. Resolved after an [ExecutionPlan] is finished. + metrics_2: Metrics, +} + +/// Json encoded metrics. Contains metric from a whole plan tree. +enum Metrics { + Unavailable, + Unresolved(Arc), + Resolved(String), } impl RecordBatchStreamAdapter { @@ -164,12 +173,14 @@ impl RecordBatchStreamAdapter { schema, stream, metrics: None, + metrics_2: Metrics::Unavailable, }) } - pub fn try_new_with_metrics( + pub fn try_new_with_metrics_and_df_plan( stream: DfSendableRecordBatchStream, metrics: BaselineMetrics, + df_plan: Arc, ) -> Result { let schema = Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); @@ -177,14 +188,26 @@ impl RecordBatchStreamAdapter { schema, stream, metrics: Some(metrics), + metrics_2: Metrics::Unresolved(df_plan), }) } + + pub fn set_metrics2(&mut self, plan: Arc) { + self.metrics_2 = Metrics::Unresolved(plan) + } } impl RecordBatchStream for RecordBatchStreamAdapter { fn schema(&self) -> SchemaRef { self.schema.clone() } + + fn metrics(&self) -> Option { + match &self.metrics_2 { + Metrics::Resolved(metrics) => Some(metrics.clone()), + Metrics::Unavailable | Metrics::Unresolved(_) => None, + } + } } impl Stream for RecordBatchStreamAdapter { @@ -206,7 +229,17 @@ impl Stream for RecordBatchStreamAdapter { df_record_batch, ))) } - Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(None) => { + if let Metrics::Unresolved(df_plan) = &self.metrics_2 { + let mut metrics_holder = RecordBatchMetrics::default(); + collect_metrics(df_plan, &mut metrics_holder); + if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 { + self.metrics_2 = + Metrics::Resolved(serde_json::to_string(&metrics_holder).unwrap()); + } + } + Poll::Ready(None) + } } } @@ -216,6 +249,30 @@ impl Stream for RecordBatchStreamAdapter { } } +fn collect_metrics(df_plan: &Arc, result: &mut RecordBatchMetrics) { + if let Some(metrics) = df_plan.metrics() { + metrics.iter().for_each(|m| match m.value() { + MetricValue::ElapsedCompute(ec) => result.elapsed_compute += ec.value(), + MetricValue::CurrentMemoryUsage(m) => result.memory_usage += m.value(), + _ => {} + }); + } + + for child in df_plan.children() { + collect_metrics(&child, result); + } +} + +/// [`RecordBatchMetrics`] carrys metrics value +/// from datanode to frontend through gRPC +#[derive(serde::Serialize, serde::Deserialize, Default, Debug)] +pub struct RecordBatchMetrics { + // cpu consumption in nanoseconds + pub elapsed_compute: usize, + // memory used by the plan in bytes + pub memory_usage: usize, +} + enum AsyncRecordBatchStreamAdapterState { Uninit(FutureStream), Ready(SendableRecordBatchStream), diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 1b0d33f915a2..889046c16624 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -20,6 +20,7 @@ pub mod util; use std::pin::Pin; use std::sync::Arc; +use arc_swap::ArcSwapOption; use datafusion::physical_plan::memory::MemoryStream; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::arrow::compute::SortOptions; @@ -39,6 +40,10 @@ pub trait RecordBatchStream: Stream> { fn output_ordering(&self) -> Option<&[OrderOption]> { None } + + fn metrics(&self) -> Option { + None + } } pub type SendableRecordBatchStream = Pin>; @@ -206,6 +211,7 @@ pub struct RecordBatchStreamWrapper { pub schema: SchemaRef, pub stream: S, pub output_ordering: Option>, + pub metrics: Arc>, } impl RecordBatchStreamWrapper { @@ -215,6 +221,7 @@ impl RecordBatchStreamWrapper { schema, stream, output_ordering: None, + metrics: Default::default(), } } } @@ -229,6 +236,10 @@ impl> + Unpin> RecordBatchStream fn output_ordering(&self) -> Option<&[OrderOption]> { self.output_ordering.as_deref() } + + fn metrics(&self) -> Option { + self.metrics.load().as_ref().map(|s| s.as_ref().clone()) + } } impl> + Unpin> Stream for RecordBatchStreamWrapper { diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 24010dd94011..866eeaa8a905 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -39,6 +39,8 @@ futures-util.workspace = true greptime-proto.workspace = true humantime = "2.1" lazy_static.workspace = true +meter-core.workspace = true +meter-macros.workspace = true object-store.workspace = true once_cell.workspace = true partition.workspace = true diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index a05093e32978..cd23e07f4c07 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -410,9 +410,9 @@ impl QueryExecutor for DatafusionQueryEngine { .map_err(BoxedError::new) .context(QueryExecutionSnafu))?, _ => { + let df_plan = Arc::new(DfPhysicalPlanAdapter(plan.clone())); // merge into a single partition - let plan = - CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(plan.clone()))); + let plan = CoalescePartitionsExec::new(df_plan.clone()); // CoalescePartitionsExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); let df_stream = plan @@ -420,10 +420,11 @@ impl QueryExecutor for DatafusionQueryEngine { .context(error::DatafusionSnafu) .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let stream = RecordBatchStreamAdapter::try_new(df_stream) + let mut stream = RecordBatchStreamAdapter::try_new(df_stream) .context(error::ConvertDfRecordBatchStreamSnafu) .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; + stream.set_metrics2(df_plan); Ok(Box::pin(stream)) } } diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 80f1492d459e..f1d59a5181e8 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -19,16 +19,17 @@ use std::time::Duration; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::stream; use common_base::bytes::Bytes; +use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::BoxedError; use common_meta::table_name::TableName; use common_query::physical_plan::TaskContext; -use common_recordbatch::adapter::DfRecordBatchStreamAdapter; +use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream, }; -use common_telemetry::tracing; use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{tracing, warn}; use datafusion::physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, }; @@ -39,6 +40,7 @@ use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; +use meter_macros::read_meter; use snafu::ResultExt; use store_api::storage::RegionId; use tokio::time::Instant; @@ -213,6 +215,20 @@ impl MergeScanExec { // reset poll timer poll_timer = Instant::now(); } + if let Some(metrics) = stream + .metrics() + .and_then(|m| serde_json::from_str::(&m).ok()) + { + let (c, s) = parse_catalog_and_schema_from_db_string(&dbname); + read_meter!( + c, + s, + metrics.elapsed_compute as u64, + metrics.memory_usage as u64, + 0 + ); + } + METRIC_MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); } })); @@ -221,6 +237,7 @@ impl MergeScanExec { schema: self.schema.clone(), stream, output_ordering: None, + metrics: Default::default(), })) } diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index 3024f5437aba..472b287c13ee 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -83,6 +83,11 @@ impl FlightRecordBatchStream { } } } + // make last package to pass metrics + if let Some(m) = recordbatches.metrics() { + let metrics = FlightMessage::Metrics(m); + let _ = tx.send(Ok(metrics)).await; + } } } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index e2843891cba9..3bc6ec77ee93 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -144,6 +144,10 @@ impl RecordBatchStream for StreamWithMetricWrapper { fn schema(&self) -> SchemaRef { self.stream.schema() } + + fn metrics(&self) -> Option { + self.stream.metrics() + } } #[cfg(test)]