From 16f59056a4920e3f7cdfbde5c7faf0f05139c1d4 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 10 Oct 2023 09:51:44 +0200 Subject: [PATCH] feat: log headers/trailers in flight CLI (+ minor fixes) (#4898) * feat: improve CLI logging setup * refactor: flight SQL DoGet should be a high-level interface * feat: log headers/trailers in SQL CLI * fix: replace explicit panics in CLI --- arrow-flight/Cargo.toml | 2 +- arrow-flight/examples/flight_sql_server.rs | 4 +- arrow-flight/src/bin/flight_sql_client.rs | 94 ++++++++++++++++++---- arrow-flight/src/sql/client.rs | 16 +++- 4 files changed, 93 insertions(+), 23 deletions(-) diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 54c5cdf5e2c7..edaa7129dc9a 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -52,7 +52,7 @@ tonic = { version = "0.10.0", default-features = false, features = ["transport", anyhow = { version = "1.0", optional = true } clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true } tracing-log = { version = "0.1", optional = true } -tracing-subscriber = { version = "0.3.1", default-features = false, features = ["ansi", "fmt"], optional = true } +tracing-subscriber = { version = "0.3.1", default-features = false, features = ["ansi", "env-filter", "fmt"], optional = true } [package.metadata.docs.rs] all-features = true diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index d1aeae6f0a6c..013f7e7788f8 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -789,7 +789,6 @@ mod tests { use arrow_cast::pretty::pretty_format_batches; use arrow_flight::sql::client::FlightSqlServiceClient; - use arrow_flight::utils::flight_data_to_batches; use tonic::transport::server::TcpIncoming; use tonic::transport::{Certificate, Endpoint}; use tower::service_fn; @@ -955,8 +954,7 @@ mod tests { let ticket = flight_info.endpoint[0].ticket.as_ref().unwrap().clone(); let flight_data = client.do_get(ticket).await.unwrap(); - let flight_data: Vec = flight_data.try_collect().await.unwrap(); - let batches = flight_data_to_batches(&flight_data).unwrap(); + let batches: Vec<_> = flight_data.try_collect().await.unwrap(); let res = pretty_format_batches(batches.as_slice()).unwrap(); let expected = r#" diff --git a/arrow-flight/src/bin/flight_sql_client.rs b/arrow-flight/src/bin/flight_sql_client.rs index c6aaccf376eb..df51530b3c8f 100644 --- a/arrow-flight/src/bin/flight_sql_client.rs +++ b/arrow-flight/src/bin/flight_sql_client.rs @@ -17,17 +17,17 @@ use std::{error::Error, sync::Arc, time::Duration}; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use arrow_array::{ArrayRef, Datum, RecordBatch, StringArray}; use arrow_cast::{cast_with_options, pretty::pretty_format_batches, CastOptions}; -use arrow_flight::{ - sql::client::FlightSqlServiceClient, utils::flight_data_to_batches, FlightData, - FlightInfo, -}; +use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo}; use arrow_schema::Schema; use clap::{Parser, Subcommand}; use futures::TryStreamExt; -use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; +use tonic::{ + metadata::MetadataMap, + transport::{Channel, ClientTlsConfig, Endpoint}, +}; use tracing_log::log::info; /// A ':' separated key value pair @@ -61,6 +61,22 @@ where } } +/// Logging CLI config. +#[derive(Debug, Parser)] +pub struct LoggingArgs { + /// Log verbosity. + /// + /// Use `-v for warn, `-vv for info, -vvv for debug, -vvvv for trace. + /// + /// Note you can also set logging level using `RUST_LOG` environment variable: `RUST_LOG=debug` + #[clap( + short = 'v', + long = "verbose", + action = clap::ArgAction::Count, + )] + log_verbose_count: u8, +} + #[derive(Debug, Parser)] struct ClientArgs { /// Additional headers. @@ -96,6 +112,10 @@ struct ClientArgs { #[derive(Debug, Parser)] struct Args { + /// Logging args. + #[clap(flatten)] + logging_args: LoggingArgs, + /// Client args. #[clap(flatten)] client_args: ClientArgs, @@ -119,7 +139,7 @@ enum Command { #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); - setup_logging()?; + setup_logging(args.logging_args)?; let mut client = setup_client(args.client_args) .await .context("setup client")?; @@ -177,16 +197,21 @@ async fn execute_flight( for endpoint in info.endpoint { let Some(ticket) = &endpoint.ticket else { - panic!("did not get ticket"); + bail!("did not get ticket"); }; - let flight_data = client.do_get(ticket.clone()).await.context("do get")?; - let flight_data: Vec = flight_data + + let mut flight_data = client.do_get(ticket.clone()).await.context("do get")?; + log_metadata(flight_data.headers(), "header"); + + let mut endpoint_batches: Vec<_> = (&mut flight_data) .try_collect() .await .context("collect data stream")?; - let mut endpoint_batches = flight_data_to_batches(&flight_data) - .context("convert flight data to record batches")?; batches.append(&mut endpoint_batches); + + if let Some(trailers) = flight_data.trailers() { + log_metadata(&trailers, "trailer"); + } } info!("received data"); @@ -213,9 +238,22 @@ fn construct_record_batch_from_params( Ok(RecordBatch::try_from_iter(items)?) } -fn setup_logging() -> Result<()> { +fn setup_logging(args: LoggingArgs) -> Result<()> { + use tracing_subscriber::{util::SubscriberInitExt, EnvFilter, FmtSubscriber}; + tracing_log::LogTracer::init().context("tracing log init")?; - tracing_subscriber::fmt::init(); + + let filter = match args.log_verbose_count { + 0 => "warn", + 1 => "info", + 2 => "debug", + _ => "trace", + }; + let filter = EnvFilter::try_new(filter).context("set up log env filter")?; + + let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); + subscriber.try_init().context("init logging subscriber")?; + Ok(()) } @@ -265,10 +303,10 @@ async fn setup_client(args: ClientArgs) -> Result { - panic!("when username is set, you also need to set a password") + bail!("when username is set, you also need to set a password") } (None, Some(_)) => { - panic!("when password is set, you also need to set a username") + bail!("when password is set, you also need to set a username") } } @@ -284,3 +322,27 @@ fn parse_key_val( .ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?; Ok((s[..pos].parse()?, s[pos + 1..].parse()?)) } + +/// Log headers/trailers. +fn log_metadata(map: &MetadataMap, what: &'static str) { + for k_v in map.iter() { + match k_v { + tonic::metadata::KeyAndValueRef::Ascii(k, v) => { + info!( + "{}: {}={}", + what, + k.as_str(), + v.to_str().unwrap_or(""), + ); + } + tonic::metadata::KeyAndValueRef::Binary(k, v) => { + info!( + "{}: {}={}", + what, + k.as_str(), + String::from_utf8_lossy(v.as_ref()), + ); + } + } + } +} diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs index 2d382cf2ca20..7685813ff844 100644 --- a/arrow-flight/src/sql/client.rs +++ b/arrow-flight/src/sql/client.rs @@ -24,6 +24,7 @@ use std::collections::HashMap; use std::str::FromStr; use tonic::metadata::AsciiMetadataKey; +use crate::decode::FlightRecordBatchStream; use crate::encode::FlightDataEncoderBuilder; use crate::error::FlightError; use crate::flight_service_client::FlightServiceClient; @@ -37,6 +38,7 @@ use crate::sql::{ CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery, CommandStatementUpdate, DoPutUpdateResult, ProstMessageExt, SqlInfo, }; +use crate::trailers::extract_lazy_trailers; use crate::{ Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage, PutResult, Ticket, @@ -231,14 +233,22 @@ impl FlightSqlServiceClient { pub async fn do_get( &mut self, ticket: impl IntoRequest, - ) -> Result, ArrowError> { + ) -> Result { let req = self.set_request_headers(ticket.into_request())?; - Ok(self + + let (md, response_stream, _ext) = self .flight_client .do_get(req) .await .map_err(status_to_arrow_error)? - .into_inner()) + .into_parts(); + let (response_stream, trailers) = extract_lazy_trailers(response_stream); + + Ok(FlightRecordBatchStream::new_from_flight_data( + response_stream.map_err(FlightError::Tonic), + ) + .with_headers(md) + .with_trailers(trailers)) } /// Push a stream to the flight service associated with a particular flight stream.