Skip to content

Commit

Permalink
chore: carry metrics in flight metadata from datanode to frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Jan 11, 2024
1 parent fd8fb64 commit e619049
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 23 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a31ea166fc015ea7ff111ac94e26c3a5d64364d2" }
greptime-proto = { git = "https://github.com/shuiyisong/greptime-proto.git", rev = "89ac89603c9f69ec9360eaaded96cb32c59465d2" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl DataSource for InformationTableDataSource {
schema: projected_schema,
stream: Box::pin(stream),
output_ordering: None,
metrics: Default::default(),
};

Ok(Box::pin(stream))
Expand Down
1 change: 1 addition & 0 deletions src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ testing = []

[dependencies]
api.workspace = true
arc-swap = "1.6"
arrow-flight.workspace = true
async-stream.workspace = true
async-trait.workspace = true
Expand Down
9 changes: 6 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,12 @@ impl Database {
);
Ok(Output::AffectedRows(rows))
}
FlightMessage::Recordbatch(_) => IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch message",
FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch or Metrics message",
}
.fail()
}
.fail(),
FlightMessage::Schema(schema) => {
let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
Expand All @@ -319,6 +321,7 @@ impl Database {
schema,
stream,
output_ordering: None,
metrics: Default::default(),
};
Ok(Output::Stream(Box::pin(record_batch_stream)))
}
Expand Down
24 changes: 19 additions & 5 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
use async_trait::async_trait;
Expand Down Expand Up @@ -119,27 +122,38 @@ impl RegionRequester {
.fail();
};

let metrics_str = Arc::new(ArcSwapOption::from(None));
let ref_str = metrics_str.clone();

let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let FlightMessage::Recordbatch(record_batch) = flight_message else {
yield IllegalFlightMessagesSnafu {

match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(s) => {
ref_str.swap(Some(Arc::new(s)));
break;
}
_ => {
yield IllegalFlightMessagesSnafu {
reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
};
yield Ok(record_batch);
break;
}
}
}
}));
let record_batch_stream = RecordBatchStreamWrapper {
schema,
stream,
output_ordering: None,
metrics: metrics_str,
};
Ok(Box::pin(record_batch_stream))
}
Expand Down
24 changes: 23 additions & 1 deletion src/common/grpc/src/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::v1::{AffectedRows, FlightMetadata};
use api::v1::{AffectedRows, FlightMetadata, Metrics};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes;
Expand All @@ -39,6 +39,7 @@ pub enum FlightMessage {
Schema(SchemaRef),
Recordbatch(RecordBatch),
AffectedRows(usize),
Metrics(String),
}

pub struct FlightEncoder {
Expand Down Expand Up @@ -85,6 +86,22 @@ impl FlightEncoder {
FlightMessage::AffectedRows(rows) => {
let metadata = FlightMetadata {
affected_rows: Some(AffectedRows { value: rows as _ }),
metrics: None,
}
.encode_to_vec();
FlightData {
flight_descriptor: None,
data_header: build_none_flight_msg().into(),
app_metadata: metadata.into(),
data_body: ProstBytes::default(),
}
}
FlightMessage::Metrics(s) => {
let metadata = FlightMetadata {
affected_rows: None,
metrics: Some(Metrics {
metrics: s.as_bytes().to_vec(),
}),
}
.encode_to_vec();
FlightData {
Expand Down Expand Up @@ -119,6 +136,11 @@ impl FlightDecoder {
if let Some(AffectedRows { value }) = metadata.affected_rows {
return Ok(FlightMessage::AffectedRows(value as _));
}
if let Some(Metrics { metrics }) = metadata.metrics {
return Ok(FlightMessage::Metrics(
String::from_utf8_lossy(&metrics).to_string(),
));
}
InvalidFlightDataSnafu {
reason: "Expecting FlightMetadata have some meaningful content.",
}
Expand Down
8 changes: 6 additions & 2 deletions src/common/query/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ impl PhysicalPlan for PhysicalPlanAdapter {
let stream = df_plan
.execute(partition, context)
.context(error::GeneralDataFusionSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new_with_metrics_and_df_plan(
stream,
baseline_metric,
df_plan,
)
.context(error::ConvertDfRecordBatchStreamSnafu)?;

Ok(Box::pin(adapter))
}
Expand Down
3 changes: 2 additions & 1 deletion src/common/recordbatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
arc-swap = "1.6"
common-error.workspace = true
common-macro.workspace = true
datafusion-common.workspace = true
Expand All @@ -13,8 +14,8 @@ datatypes.workspace = true
futures.workspace = true
paste = "1.0"
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true

[dev-dependencies]
serde_json = "1.0"
tokio.workspace = true
63 changes: 59 additions & 4 deletions src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use datafusion::arrow::compute::cast;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
use datafusion::physical_plan::metrics::BaselineMetrics;
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream as DfRecordBatchStream};
use datafusion_common::DataFusionError;
use datatypes::schema::{Schema, SchemaRef};
use futures::ready;
Expand Down Expand Up @@ -154,6 +154,13 @@ pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
metrics_2: Metrics,
}

enum Metrics {
Unavailable,
Unresolved(Arc<dyn ExecutionPlan>),
Resolved(String),
}

impl RecordBatchStreamAdapter {
Expand All @@ -164,27 +171,41 @@ impl RecordBatchStreamAdapter {
schema,
stream,
metrics: None,
metrics_2: Metrics::Unavailable,
})
}

pub fn try_new_with_metrics(
pub fn try_new_with_metrics_and_df_plan(
stream: DfSendableRecordBatchStream,
metrics: BaselineMetrics,
df_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
Ok(Self {
schema,
stream,
metrics: Some(metrics),
metrics_2: Metrics::Unresolved(df_plan),
})
}

pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
self.metrics_2 = Metrics::Unresolved(plan)
}
}

impl RecordBatchStream for RecordBatchStreamAdapter {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn metrics(&self) -> Option<String> {
match &self.metrics_2 {
Metrics::Resolved(metrics) => Some(metrics.clone()),
Metrics::Unavailable | Metrics::Unresolved(_) => None,
}
}
}

impl Stream for RecordBatchStreamAdapter {
Expand All @@ -206,7 +227,17 @@ impl Stream for RecordBatchStreamAdapter {
df_record_batch,
)))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
let mut metrics_holder = RecordBatchMetrics::default();
collect_metrics(df_plan, &mut metrics_holder);
if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 {
self.metrics_2 =
Metrics::Resolved(serde_json::to_string(&metrics_holder).unwrap());
}
}
Poll::Ready(None)
}
}
}

Expand All @@ -216,6 +247,30 @@ impl Stream for RecordBatchStreamAdapter {
}
}

fn collect_metrics(df_plan: &Arc<dyn ExecutionPlan>, result: &mut RecordBatchMetrics) {
if let Some(metrics) = df_plan.metrics() {
metrics.iter().for_each(|m| match m.value() {
MetricValue::ElapsedCompute(ec) => result.elapsed_compute += ec.value(),
MetricValue::CurrentMemoryUsage(m) => result.memory_usage += m.value(),
_ => {}
});
}

for child in df_plan.children() {
collect_metrics(&child, result);
}
}

/// [`RecordBatchMetrics`] carrys metrics value
/// from datanode to frontend through gRPC
#[derive(serde::Serialize, serde::Deserialize, Default, Debug)]
pub struct RecordBatchMetrics {
// cpu comsumption in nanoseconds
pub elapsed_compute: usize,
// memory used by the plan in bytes
pub memory_usage: usize,
}

enum AsyncRecordBatchStreamAdapterState {
Uninit(FutureStream),
Ready(SendableRecordBatchStream),
Expand Down
11 changes: 11 additions & 0 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod util;
use std::pin::Pin;
use std::sync::Arc;

use arc_swap::ArcSwapOption;
use datafusion::physical_plan::memory::MemoryStream;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::compute::SortOptions;
Expand All @@ -39,6 +40,10 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}

fn metrics(&self) -> Option<String> {
None
}
}

pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
Expand Down Expand Up @@ -206,6 +211,7 @@ pub struct RecordBatchStreamWrapper<S> {
pub schema: SchemaRef,
pub stream: S,
pub output_ordering: Option<Vec<OrderOption>>,
pub metrics: Arc<ArcSwapOption<String>>,
}

impl<S> RecordBatchStreamWrapper<S> {
Expand All @@ -215,6 +221,7 @@ impl<S> RecordBatchStreamWrapper<S> {
schema,
stream,
output_ordering: None,
metrics: Default::default(),
}
}
}
Expand All @@ -229,6 +236,10 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.output_ordering.as_deref()
}

fn metrics(&self) -> Option<String> {
self.metrics.load().as_ref().map(|s| s.as_ref().clone())
}
}

impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {
Expand Down
2 changes: 2 additions & 0 deletions src/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ futures-util.workspace = true
greptime-proto.workspace = true
humantime = "2.1"
lazy_static.workspace = true
meter-core.workspace = true
meter-macros.workspace = true
object-store.workspace = true
once_cell.workspace = true
partition.workspace = true
Expand Down
Loading

0 comments on commit e619049

Please sign in to comment.