Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: carry metrics in flight metadata from datanode to frontend #3113

Merged
merged 7 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a31ea166fc015ea7ff111ac94e26c3a5d64364d2" }
greptime-proto = { git = "https://github.com/shuiyisong/greptime-proto.git", rev = "89ac89603c9f69ec9360eaaded96cb32c59465d2" }
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl DataSource for InformationTableDataSource {
schema: projected_schema,
stream: Box::pin(stream),
output_ordering: None,
metrics: Default::default(),
};

Ok(Box::pin(stream))
Expand Down
1 change: 1 addition & 0 deletions src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ testing = []

[dependencies]
api.workspace = true
arc-swap = "1.6"
arrow-flight.workspace = true
async-stream.workspace = true
async-trait.workspace = true
Expand Down
22 changes: 14 additions & 8 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,30 +295,36 @@ impl Database {
);
Ok(Output::AffectedRows(rows))
}
FlightMessage::Recordbatch(_) => IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch message",
FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch or Metrics message",
}
.fail()
}
.fail(),
FlightMessage::Schema(schema) => {
let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let FlightMessage::Recordbatch(record_batch) = flight_message else {
yield IllegalFlightMessagesSnafu {reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"}
match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(_) => {}
FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => {
yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
};
yield Ok(record_batch);
break;
}
}
}
}));
let record_batch_stream = RecordBatchStreamWrapper {
schema,
stream,
output_ordering: None,
metrics: Default::default(),
};
Ok(Output::Stream(Box::pin(record_batch_stream)))
}
Expand Down
24 changes: 19 additions & 5 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
use async_trait::async_trait;
Expand Down Expand Up @@ -119,27 +122,38 @@ impl RegionRequester {
.fail();
};

let metrics_str = Arc::new(ArcSwapOption::from(None));
let ref_str = metrics_str.clone();

let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let FlightMessage::Recordbatch(record_batch) = flight_message else {
yield IllegalFlightMessagesSnafu {

match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(s) => {
ref_str.swap(Some(Arc::new(s)));
break;
}
_ => {
yield IllegalFlightMessagesSnafu {
reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
};
yield Ok(record_batch);
break;
}
}
}
}));
let record_batch_stream = RecordBatchStreamWrapper {
schema,
stream,
output_ordering: None,
metrics: metrics_str,
};
Ok(Box::pin(record_batch_stream))
}
Expand Down
24 changes: 23 additions & 1 deletion src/common/grpc/src/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::v1::{AffectedRows, FlightMetadata};
use api::v1::{AffectedRows, FlightMetadata, Metrics};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes;
Expand All @@ -39,6 +39,7 @@ pub enum FlightMessage {
Schema(SchemaRef),
Recordbatch(RecordBatch),
AffectedRows(usize),
Metrics(String),
}

pub struct FlightEncoder {
Expand Down Expand Up @@ -85,6 +86,22 @@ impl FlightEncoder {
FlightMessage::AffectedRows(rows) => {
let metadata = FlightMetadata {
affected_rows: Some(AffectedRows { value: rows as _ }),
metrics: None,
}
.encode_to_vec();
FlightData {
flight_descriptor: None,
data_header: build_none_flight_msg().into(),
app_metadata: metadata.into(),
data_body: ProstBytes::default(),
}
}
FlightMessage::Metrics(s) => {
let metadata = FlightMetadata {
affected_rows: None,
metrics: Some(Metrics {
metrics: s.as_bytes().to_vec(),
}),
}
.encode_to_vec();
FlightData {
Expand Down Expand Up @@ -119,6 +136,11 @@ impl FlightDecoder {
if let Some(AffectedRows { value }) = metadata.affected_rows {
return Ok(FlightMessage::AffectedRows(value as _));
}
if let Some(Metrics { metrics }) = metadata.metrics {
return Ok(FlightMessage::Metrics(
String::from_utf8_lossy(&metrics).to_string(),
));
}
InvalidFlightDataSnafu {
reason: "Expecting FlightMetadata have some meaningful content.",
}
Expand Down
8 changes: 6 additions & 2 deletions src/common/query/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ impl PhysicalPlan for PhysicalPlanAdapter {
let stream = df_plan
.execute(partition, context)
.context(error::GeneralDataFusionSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new_with_metrics_and_df_plan(
stream,
baseline_metric,
df_plan,
)
.context(error::ConvertDfRecordBatchStreamSnafu)?;

Ok(Box::pin(adapter))
}
Expand Down
3 changes: 2 additions & 1 deletion src/common/recordbatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
arc-swap = "1.6"
common-error.workspace = true
common-macro.workspace = true
datafusion-common.workspace = true
Expand All @@ -13,8 +14,8 @@ datatypes.workspace = true
futures.workspace = true
paste = "1.0"
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true

[dev-dependencies]
serde_json = "1.0"
tokio.workspace = true
63 changes: 59 additions & 4 deletions src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use datafusion::arrow::compute::cast;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
use datafusion::physical_plan::metrics::BaselineMetrics;
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream as DfRecordBatchStream};
use datafusion_common::DataFusionError;
use datatypes::schema::{Schema, SchemaRef};
use futures::ready;
Expand Down Expand Up @@ -154,6 +154,13 @@ pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
metrics_2: Metrics,
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
}

enum Metrics {
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
Unavailable,
Unresolved(Arc<dyn ExecutionPlan>),
Resolved(String),
}

impl RecordBatchStreamAdapter {
Expand All @@ -164,27 +171,41 @@ impl RecordBatchStreamAdapter {
schema,
stream,
metrics: None,
metrics_2: Metrics::Unavailable,
})
}

pub fn try_new_with_metrics(
pub fn try_new_with_metrics_and_df_plan(
stream: DfSendableRecordBatchStream,
metrics: BaselineMetrics,
df_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
Ok(Self {
schema,
stream,
metrics: Some(metrics),
metrics_2: Metrics::Unresolved(df_plan),
})
}

pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
self.metrics_2 = Metrics::Unresolved(plan)
}
}

impl RecordBatchStream for RecordBatchStreamAdapter {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn metrics(&self) -> Option<String> {
match &self.metrics_2 {
Metrics::Resolved(metrics) => Some(metrics.clone()),
Metrics::Unavailable | Metrics::Unresolved(_) => None,
}
}
}

impl Stream for RecordBatchStreamAdapter {
Expand All @@ -206,7 +227,17 @@ impl Stream for RecordBatchStreamAdapter {
df_record_batch,
)))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
let mut metrics_holder = RecordBatchMetrics::default();
collect_metrics(df_plan, &mut metrics_holder);
if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 {
self.metrics_2 =
Metrics::Resolved(serde_json::to_string(&metrics_holder).unwrap());
}
}
Poll::Ready(None)
}
}
}

Expand All @@ -216,6 +247,30 @@ impl Stream for RecordBatchStreamAdapter {
}
}

fn collect_metrics(df_plan: &Arc<dyn ExecutionPlan>, result: &mut RecordBatchMetrics) {
if let Some(metrics) = df_plan.metrics() {
metrics.iter().for_each(|m| match m.value() {
MetricValue::ElapsedCompute(ec) => result.elapsed_compute += ec.value(),
MetricValue::CurrentMemoryUsage(m) => result.memory_usage += m.value(),
_ => {}
});
}

for child in df_plan.children() {
collect_metrics(&child, result);
}
}

/// [`RecordBatchMetrics`] carrys metrics value
/// from datanode to frontend through gRPC
#[derive(serde::Serialize, serde::Deserialize, Default, Debug)]
pub struct RecordBatchMetrics {
// cpu consumption in nanoseconds
pub elapsed_compute: usize,
// memory used by the plan in bytes
pub memory_usage: usize,
}

enum AsyncRecordBatchStreamAdapterState {
Uninit(FutureStream),
Ready(SendableRecordBatchStream),
Expand Down
Loading
Loading