Skip to content

Commit

Permalink
feat: log headers/trailers in flight CLI (+ minor fixes) (#4898)
Browse files Browse the repository at this point in the history
* feat: improve CLI logging setup

* refactor: flight SQL DoGet should be a high-level interface

* feat: log headers/trailers in SQL CLI

* fix: replace explicit panics in CLI
  • Loading branch information
crepererum authored Oct 10, 2023
1 parent 2af5163 commit 16f5905
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 23 deletions.
2 changes: 1 addition & 1 deletion arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ tonic = { version = "0.10.0", default-features = false, features = ["transport",
anyhow = { version = "1.0", optional = true }
clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true }
tracing-log = { version = "0.1", optional = true }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["ansi", "fmt"], optional = true }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["ansi", "env-filter", "fmt"], optional = true }

[package.metadata.docs.rs]
all-features = true
Expand Down
4 changes: 1 addition & 3 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,6 @@ mod tests {

use arrow_cast::pretty::pretty_format_batches;
use arrow_flight::sql::client::FlightSqlServiceClient;
use arrow_flight::utils::flight_data_to_batches;
use tonic::transport::server::TcpIncoming;
use tonic::transport::{Certificate, Endpoint};
use tower::service_fn;
Expand Down Expand Up @@ -955,8 +954,7 @@ mod tests {

let ticket = flight_info.endpoint[0].ticket.as_ref().unwrap().clone();
let flight_data = client.do_get(ticket).await.unwrap();
let flight_data: Vec<FlightData> = flight_data.try_collect().await.unwrap();
let batches = flight_data_to_batches(&flight_data).unwrap();
let batches: Vec<_> = flight_data.try_collect().await.unwrap();

let res = pretty_format_batches(batches.as_slice()).unwrap();
let expected = r#"
Expand Down
94 changes: 78 additions & 16 deletions arrow-flight/src/bin/flight_sql_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

use std::{error::Error, sync::Arc, time::Duration};

use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use arrow_array::{ArrayRef, Datum, RecordBatch, StringArray};
use arrow_cast::{cast_with_options, pretty::pretty_format_batches, CastOptions};
use arrow_flight::{
sql::client::FlightSqlServiceClient, utils::flight_data_to_batches, FlightData,
FlightInfo,
};
use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo};
use arrow_schema::Schema;
use clap::{Parser, Subcommand};
use futures::TryStreamExt;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
use tonic::{
metadata::MetadataMap,
transport::{Channel, ClientTlsConfig, Endpoint},
};
use tracing_log::log::info;

/// A ':' separated key value pair
Expand Down Expand Up @@ -61,6 +61,22 @@ where
}
}

/// Logging CLI config.
#[derive(Debug, Parser)]
pub struct LoggingArgs {
/// Log verbosity.
///
/// Use `-v for warn, `-vv for info, -vvv for debug, -vvvv for trace.
///
/// Note you can also set logging level using `RUST_LOG` environment variable: `RUST_LOG=debug`
#[clap(
short = 'v',
long = "verbose",
action = clap::ArgAction::Count,
)]
log_verbose_count: u8,
}

#[derive(Debug, Parser)]
struct ClientArgs {
/// Additional headers.
Expand Down Expand Up @@ -96,6 +112,10 @@ struct ClientArgs {

#[derive(Debug, Parser)]
struct Args {
/// Logging args.
#[clap(flatten)]
logging_args: LoggingArgs,

/// Client args.
#[clap(flatten)]
client_args: ClientArgs,
Expand All @@ -119,7 +139,7 @@ enum Command {
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
setup_logging()?;
setup_logging(args.logging_args)?;
let mut client = setup_client(args.client_args)
.await
.context("setup client")?;
Expand Down Expand Up @@ -177,16 +197,21 @@ async fn execute_flight(

for endpoint in info.endpoint {
let Some(ticket) = &endpoint.ticket else {
panic!("did not get ticket");
bail!("did not get ticket");
};
let flight_data = client.do_get(ticket.clone()).await.context("do get")?;
let flight_data: Vec<FlightData> = flight_data

let mut flight_data = client.do_get(ticket.clone()).await.context("do get")?;
log_metadata(flight_data.headers(), "header");

let mut endpoint_batches: Vec<_> = (&mut flight_data)
.try_collect()
.await
.context("collect data stream")?;
let mut endpoint_batches = flight_data_to_batches(&flight_data)
.context("convert flight data to record batches")?;
batches.append(&mut endpoint_batches);

if let Some(trailers) = flight_data.trailers() {
log_metadata(&trailers, "trailer");
}
}
info!("received data");

Expand All @@ -213,9 +238,22 @@ fn construct_record_batch_from_params(
Ok(RecordBatch::try_from_iter(items)?)
}

fn setup_logging() -> Result<()> {
fn setup_logging(args: LoggingArgs) -> Result<()> {
use tracing_subscriber::{util::SubscriberInitExt, EnvFilter, FmtSubscriber};

tracing_log::LogTracer::init().context("tracing log init")?;
tracing_subscriber::fmt::init();

let filter = match args.log_verbose_count {
0 => "warn",
1 => "info",
2 => "debug",
_ => "trace",
};
let filter = EnvFilter::try_new(filter).context("set up log env filter")?;

let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
subscriber.try_init().context("init logging subscriber")?;

Ok(())
}

Expand Down Expand Up @@ -265,10 +303,10 @@ async fn setup_client(args: ClientArgs) -> Result<FlightSqlServiceClient<Channel
info!("performed handshake");
}
(Some(_), None) => {
panic!("when username is set, you also need to set a password")
bail!("when username is set, you also need to set a password")
}
(None, Some(_)) => {
panic!("when password is set, you also need to set a username")
bail!("when password is set, you also need to set a username")
}
}

Expand All @@ -284,3 +322,27 @@ fn parse_key_val(
.ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?;
Ok((s[..pos].parse()?, s[pos + 1..].parse()?))
}

/// Log headers/trailers.
fn log_metadata(map: &MetadataMap, what: &'static str) {
for k_v in map.iter() {
match k_v {
tonic::metadata::KeyAndValueRef::Ascii(k, v) => {
info!(
"{}: {}={}",
what,
k.as_str(),
v.to_str().unwrap_or("<invalid>"),
);
}
tonic::metadata::KeyAndValueRef::Binary(k, v) => {
info!(
"{}: {}={}",
what,
k.as_str(),
String::from_utf8_lossy(v.as_ref()),
);
}
}
}
}
16 changes: 13 additions & 3 deletions arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::collections::HashMap;
use std::str::FromStr;
use tonic::metadata::AsciiMetadataKey;

use crate::decode::FlightRecordBatchStream;
use crate::encode::FlightDataEncoderBuilder;
use crate::error::FlightError;
use crate::flight_service_client::FlightServiceClient;
Expand All @@ -37,6 +38,7 @@ use crate::sql::{
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementUpdate, DoPutUpdateResult, ProstMessageExt, SqlInfo,
};
use crate::trailers::extract_lazy_trailers;
use crate::{
Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse, IpcMessage, PutResult, Ticket,
Expand Down Expand Up @@ -231,14 +233,22 @@ impl FlightSqlServiceClient<Channel> {
pub async fn do_get(
&mut self,
ticket: impl IntoRequest<Ticket>,
) -> Result<Streaming<FlightData>, ArrowError> {
) -> Result<FlightRecordBatchStream, ArrowError> {
let req = self.set_request_headers(ticket.into_request())?;
Ok(self

let (md, response_stream, _ext) = self
.flight_client
.do_get(req)
.await
.map_err(status_to_arrow_error)?
.into_inner())
.into_parts();
let (response_stream, trailers) = extract_lazy_trailers(response_stream);

Ok(FlightRecordBatchStream::new_from_flight_data(
response_stream.map_err(FlightError::Tonic),
)
.with_headers(md)
.with_trailers(trailers))
}

/// Push a stream to the flight service associated with a particular flight stream.
Expand Down

0 comments on commit 16f5905

Please sign in to comment.