From fba5cc0b9062297e38cbe388d7f1b13debe8ba92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Thu, 28 Dec 2023 15:27:21 +0300 Subject: [PATCH] Streaming CLI support (#8651) * Streaming CLI support * Update Cargo.toml * Remove duplications * Clean up * Stream test will be added * Update print_format.rs * Address feedback * Final fix --------- Co-authored-by: Mehmet Ozan Kabak --- Cargo.toml | 2 +- datafusion-cli/Cargo.lock | 1 + datafusion-cli/Cargo.toml | 1 + datafusion-cli/src/exec.rs | 66 +++-- datafusion-cli/src/main.rs | 19 +- datafusion-cli/src/print_format.rs | 278 +++++++++++------- datafusion-cli/src/print_options.rs | 74 ++++- .../core/src/datasource/physical_plan/mod.rs | 15 + 8 files changed, 295 insertions(+), 161 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a698fbf471f9..4ee29ea6298c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ arrow = { version = "49.0.0", features = ["prettyprint"] } arrow-array = { version = "49.0.0", default-features = false, features = ["chrono-tz"] } arrow-buffer = { version = "49.0.0", default-features = false } arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } -arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] } +arrow-ipc = { version = "49.0.0", default-features = false, features = ["lz4"] } arrow-ord = { version = "49.0.0", default-features = false } arrow-schema = { version = "49.0.0", default-features = false } async-trait = "0.1.73" diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 9f75013c86dc..8e9bbd8a0dfd 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1160,6 +1160,7 @@ dependencies = [ "datafusion-common", "dirs", "env_logger", + "futures", "mimalloc", "object_store", "parking_lot", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index f57097683698..e1ddba4cad1a 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -38,6 +38,7 @@ datafusion = { path = "../datafusion/core", version = "34.0.0", features = ["avr datafusion-common = { path = "../datafusion/common" } dirs = "4.0.0" env_logger = "0.9" +futures = "0.3" mimalloc = { version = "0.1", default-features = false } object_store = { version = "0.8.0", features = ["aws", "gcp"] } parking_lot = { version = "0.12" } diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 8af534cd1375..ba9aa2e69aa6 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -17,6 +17,12 @@ //! Execution functions +use std::io::prelude::*; +use std::io::BufReader; +use std::time::Instant; +use std::{fs::File, sync::Arc}; + +use crate::print_format::PrintFormat; use crate::{ command::{Command, OutputFormat}, helper::{unescape_input, CliHelper}, @@ -26,21 +32,19 @@ use crate::{ }, print_options::{MaxRows, PrintOptions}, }; -use datafusion::common::plan_datafusion_err; + +use datafusion::common::{exec_datafusion_err, plan_datafusion_err}; +use datafusion::datasource::listing::ListingTableUrl; +use datafusion::datasource::physical_plan::is_plan_streaming; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan}; +use datafusion::physical_plan::{collect, execute_stream}; +use datafusion::prelude::SessionContext; use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str}; -use datafusion::{ - datasource::listing::ListingTableUrl, - error::{DataFusionError, Result}, - logical_expr::{CreateExternalTable, DdlStatement}, -}; -use datafusion::{logical_expr::LogicalPlan, prelude::SessionContext}; + use object_store::ObjectStore; use rustyline::error::ReadlineError; use rustyline::Editor; -use std::io::prelude::*; -use std::io::BufReader; -use std::time::Instant; -use std::{fs::File, sync::Arc}; use url::Url; /// run and execute SQL statements and commands, against a context with the given print options @@ -125,8 +129,6 @@ pub async fn exec_from_repl( ))); rl.load_history(".history").ok(); - let mut print_options = print_options.clone(); - loop { match rl.readline("❯ ") { Ok(line) if line.starts_with('\\') => { @@ -138,9 +140,7 @@ pub async fn exec_from_repl( Command::OutputFormat(subcommand) => { if let Some(subcommand) = subcommand { if let Ok(command) = subcommand.parse::() { - if let Err(e) = - command.execute(&mut print_options).await - { + if let Err(e) = command.execute(print_options).await { eprintln!("{e}") } } else { @@ -154,7 +154,7 @@ pub async fn exec_from_repl( } } _ => { - if let Err(e) = cmd.execute(ctx, &mut print_options).await { + if let Err(e) = cmd.execute(ctx, print_options).await { eprintln!("{e}") } } @@ -165,7 +165,7 @@ pub async fn exec_from_repl( } Ok(line) => { rl.add_history_entry(line.trim_end())?; - match exec_and_print(ctx, &print_options, line).await { + match exec_and_print(ctx, print_options, line).await { Ok(_) => {} Err(err) => eprintln!("{err}"), } @@ -198,7 +198,6 @@ async fn exec_and_print( sql: String, ) -> Result<()> { let now = Instant::now(); - let sql = unescape_input(&sql)?; let task_ctx = ctx.task_ctx(); let dialect = &task_ctx.session_config().options().sql_parser.dialect; @@ -227,18 +226,24 @@ async fn exec_and_print( if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(ctx, cmd).await?; } + let df = ctx.execute_logical_plan(plan).await?; - let results = df.collect().await?; + let physical_plan = df.create_physical_plan().await?; - let print_options = if should_ignore_maxrows { - PrintOptions { - maxrows: MaxRows::Unlimited, - ..print_options.clone() - } + if is_plan_streaming(&physical_plan)? { + let stream = execute_stream(physical_plan, task_ctx.clone())?; + print_options.print_stream(stream, now).await?; } else { - print_options.clone() - }; - print_options.print_batches(&results, now)?; + let mut print_options = print_options.clone(); + if should_ignore_maxrows { + print_options.maxrows = MaxRows::Unlimited; + } + if print_options.format == PrintFormat::Automatic { + print_options.format = PrintFormat::Table; + } + let results = collect(physical_plan, task_ctx.clone()).await?; + print_options.print_batches(&results, now)?; + } } Ok(()) @@ -272,10 +277,7 @@ async fn create_external_table( .object_store_registry .get_store(url) .map_err(|_| { - DataFusionError::Execution(format!( - "Unsupported object store scheme: {}", - scheme - )) + exec_datafusion_err!("Unsupported object store scheme: {}", scheme) })? } }; diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 8b74a797b57b..563d172f2c95 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -15,7 +15,12 @@ // specific language governing permissions and limitations // under the License. -use clap::Parser; +use std::collections::HashMap; +use std::env; +use std::path::Path; +use std::str::FromStr; +use std::sync::{Arc, OnceLock}; + use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool}; @@ -29,12 +34,9 @@ use datafusion_cli::{ print_options::{MaxRows, PrintOptions}, DATAFUSION_CLI_VERSION, }; + +use clap::Parser; use mimalloc::MiMalloc; -use std::collections::HashMap; -use std::env; -use std::path::Path; -use std::str::FromStr; -use std::sync::{Arc, OnceLock}; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -111,7 +113,7 @@ struct Args { )] rc: Option>, - #[clap(long, arg_enum, default_value_t = PrintFormat::Table)] + #[clap(long, arg_enum, default_value_t = PrintFormat::Automatic)] format: PrintFormat, #[clap( @@ -331,9 +333,8 @@ fn extract_memory_pool_size(size: &str) -> Result { #[cfg(test)] mod tests { - use datafusion::assert_batches_eq; - use super::*; + use datafusion::assert_batches_eq; fn assert_conversion(input: &str, expected: Result) { let result = extract_memory_pool_size(input); diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 0738bf6f9b47..ea418562495d 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -16,23 +16,27 @@ // under the License. //! Print format variants + +use std::str::FromStr; + use crate::print_options::MaxRows; + use arrow::csv::writer::WriterBuilder; use arrow::json::{ArrayWriter, LineDelimitedWriter}; +use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches_with_options; -use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::format::DEFAULT_FORMAT_OPTIONS; -use datafusion::error::{DataFusionError, Result}; -use std::str::FromStr; +use datafusion::error::Result; /// Allow records to be printed in different formats -#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)] +#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone, Copy)] pub enum PrintFormat { Csv, Tsv, Table, Json, NdJson, + Automatic, } impl FromStr for PrintFormat { @@ -44,31 +48,44 @@ impl FromStr for PrintFormat { } macro_rules! batches_to_json { - ($WRITER: ident, $batches: expr) => {{ - let mut bytes = vec![]; + ($WRITER: ident, $writer: expr, $batches: expr) => {{ { - let mut writer = $WRITER::new(&mut bytes); - $batches.iter().try_for_each(|batch| writer.write(batch))?; - writer.finish()?; + if !$batches.is_empty() { + let mut json_writer = $WRITER::new(&mut *$writer); + for batch in $batches { + json_writer.write(batch)?; + } + json_writer.finish()?; + json_finish!($WRITER, $writer); + } } - String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))? + Ok(()) as Result<()> }}; } -fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result { - let mut bytes = vec![]; - { - let builder = WriterBuilder::new() - .with_header(true) - .with_delimiter(delimiter); - let mut writer = builder.build(&mut bytes); - for batch in batches { - writer.write(batch)?; - } +macro_rules! json_finish { + (ArrayWriter, $writer: expr) => {{ + writeln!($writer)?; + }}; + (LineDelimitedWriter, $writer: expr) => {{}}; +} + +fn print_batches_with_sep( + writer: &mut W, + batches: &[RecordBatch], + delimiter: u8, + with_header: bool, +) -> Result<()> { + let builder = WriterBuilder::new() + .with_header(with_header) + .with_delimiter(delimiter); + let mut csv_writer = builder.build(writer); + + for batch in batches { + csv_writer.write(batch)?; } - let formatted = - String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))?; - Ok(formatted) + + Ok(()) } fn keep_only_maxrows(s: &str, maxrows: usize) -> String { @@ -88,97 +105,118 @@ fn keep_only_maxrows(s: &str, maxrows: usize) -> String { result.join("\n") } -fn format_batches_with_maxrows( +fn format_batches_with_maxrows( + writer: &mut W, batches: &[RecordBatch], maxrows: MaxRows, -) -> Result { +) -> Result<()> { match maxrows { MaxRows::Limited(maxrows) => { - // Only format enough batches for maxrows + // Filter batches to meet the maxrows condition let mut filtered_batches = Vec::new(); - let mut batches = batches; - let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); - if row_count > maxrows { - let mut accumulated_rows = 0; - - for batch in batches { + let mut row_count: usize = 0; + let mut over_limit = false; + for batch in batches { + if row_count + batch.num_rows() > maxrows { + // If adding this batch exceeds maxrows, slice the batch + let limit = maxrows - row_count; + let sliced_batch = batch.slice(0, limit); + filtered_batches.push(sliced_batch); + over_limit = true; + break; + } else { filtered_batches.push(batch.clone()); - if accumulated_rows + batch.num_rows() > maxrows { - break; - } - accumulated_rows += batch.num_rows(); + row_count += batch.num_rows(); } - - batches = &filtered_batches; } - let mut formatted = format!( - "{}", - pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?, - ); - - if row_count > maxrows { - formatted = keep_only_maxrows(&formatted, maxrows); + let formatted = pretty_format_batches_with_options( + &filtered_batches, + &DEFAULT_FORMAT_OPTIONS, + )?; + if over_limit { + let mut formatted_str = format!("{}", formatted); + formatted_str = keep_only_maxrows(&formatted_str, maxrows); + writeln!(writer, "{}", formatted_str)?; + } else { + writeln!(writer, "{}", formatted)?; } - - Ok(formatted) } MaxRows::Unlimited => { - // maxrows not specified, print all rows - Ok(format!( - "{}", - pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?, - )) + let formatted = + pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?; + writeln!(writer, "{}", formatted)?; } } + + Ok(()) } impl PrintFormat { - /// print the batches to stdout using the specified format - /// `maxrows` option is only used for `Table` format: - /// If `maxrows` is Some(n), then at most n rows will be displayed - /// If `maxrows` is None, then every row will be displayed - pub fn print_batches(&self, batches: &[RecordBatch], maxrows: MaxRows) -> Result<()> { - if batches.is_empty() { + /// Print the batches to a writer using the specified format + pub fn print_batches( + &self, + writer: &mut W, + batches: &[RecordBatch], + maxrows: MaxRows, + with_header: bool, + ) -> Result<()> { + if batches.is_empty() || batches[0].num_rows() == 0 { return Ok(()); } match self { - Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), - Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), + Self::Csv | Self::Automatic => { + print_batches_with_sep(writer, batches, b',', with_header) + } + Self::Tsv => print_batches_with_sep(writer, batches, b'\t', with_header), Self::Table => { if maxrows == MaxRows::Limited(0) { return Ok(()); } - println!("{}", format_batches_with_maxrows(batches, maxrows)?,) - } - Self::Json => println!("{}", batches_to_json!(ArrayWriter, batches)), - Self::NdJson => { - println!("{}", batches_to_json!(LineDelimitedWriter, batches)) + format_batches_with_maxrows(writer, batches, maxrows) } + Self::Json => batches_to_json!(ArrayWriter, writer, batches), + Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, batches), } - Ok(()) } } #[cfg(test)] mod tests { + use std::io::{Cursor, Read, Write}; + use std::sync::Arc; + use super::*; + use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; - use std::sync::Arc; + use datafusion::error::Result; + + fn run_test(batches: &[RecordBatch], test_fn: F) -> Result + where + F: Fn(&mut Cursor>, &[RecordBatch]) -> Result<()>, + { + let mut buffer = Cursor::new(Vec::new()); + test_fn(&mut buffer, batches)?; + buffer.set_position(0); + let mut contents = String::new(); + buffer.read_to_string(&mut contents)?; + Ok(contents) + } #[test] - fn test_print_batches_with_sep() { - let batches = vec![]; - assert_eq!("", print_batches_with_sep(&batches, b',').unwrap()); + fn test_print_batches_with_sep() -> Result<()> { + let contents = run_test(&[], |buffer, batches| { + print_batches_with_sep(buffer, batches, b',', true) + })?; + assert_eq!(contents, ""); let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), Field::new("c", DataType::Int32, false), ])); - let batch = RecordBatch::try_new( schema, vec![ @@ -186,29 +224,33 @@ mod tests { Arc::new(Int32Array::from(vec![4, 5, 6])), Arc::new(Int32Array::from(vec![7, 8, 9])), ], - ) - .unwrap(); + )?; - let batches = vec![batch]; - let r = print_batches_with_sep(&batches, b',').unwrap(); - assert_eq!("a,b,c\n1,4,7\n2,5,8\n3,6,9\n", r); + let contents = run_test(&[batch], |buffer, batches| { + print_batches_with_sep(buffer, batches, b',', true) + })?; + assert_eq!(contents, "a,b,c\n1,4,7\n2,5,8\n3,6,9\n"); + + Ok(()) } #[test] fn test_print_batches_to_json_empty() -> Result<()> { - let batches = vec![]; - let r = batches_to_json!(ArrayWriter, &batches); - assert_eq!("", r); + let contents = run_test(&[], |buffer, batches| { + batches_to_json!(ArrayWriter, buffer, batches) + })?; + assert_eq!(contents, ""); - let r = batches_to_json!(LineDelimitedWriter, &batches); - assert_eq!("", r); + let contents = run_test(&[], |buffer, batches| { + batches_to_json!(LineDelimitedWriter, buffer, batches) + })?; + assert_eq!(contents, ""); let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), Field::new("c", DataType::Int32, false), ])); - let batch = RecordBatch::try_new( schema, vec![ @@ -216,25 +258,29 @@ mod tests { Arc::new(Int32Array::from(vec![4, 5, 6])), Arc::new(Int32Array::from(vec![7, 8, 9])), ], - ) - .unwrap(); - + )?; let batches = vec![batch]; - let r = batches_to_json!(ArrayWriter, &batches); - assert_eq!("[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]", r); - let r = batches_to_json!(LineDelimitedWriter, &batches); - assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n", r); + let contents = run_test(&batches, |buffer, batches| { + batches_to_json!(ArrayWriter, buffer, batches) + })?; + assert_eq!(contents, "[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]\n"); + + let contents = run_test(&batches, |buffer, batches| { + batches_to_json!(LineDelimitedWriter, buffer, batches) + })?; + assert_eq!(contents, "{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n"); + Ok(()) } #[test] fn test_format_batches_with_maxrows() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - - let batch = - RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]) - .unwrap(); + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?; #[rustfmt::skip] let all_rows_expected = [ @@ -244,7 +290,7 @@ mod tests { "| 1 |", "| 2 |", "| 3 |", - "+---+", + "+---+\n", ].join("\n"); #[rustfmt::skip] @@ -256,7 +302,7 @@ mod tests { "| . |", "| . |", "| . |", - "+---+", + "+---+\n", ].join("\n"); #[rustfmt::skip] @@ -272,26 +318,36 @@ mod tests { "| . |", "| . |", "| . |", - "+---+", + "+---+\n", ].join("\n"); - let no_limit = format_batches_with_maxrows(&[batch.clone()], MaxRows::Unlimited)?; - assert_eq!(all_rows_expected, no_limit); - - let maxrows_less_than_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(1))?; - assert_eq!(one_row_expected, maxrows_less_than_actual); - let maxrows_more_than_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(5))?; - assert_eq!(all_rows_expected, maxrows_more_than_actual); - let maxrows_equals_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(3))?; - assert_eq!(all_rows_expected, maxrows_equals_actual); - let multi_batches = format_batches_with_maxrows( + let no_limit = run_test(&[batch.clone()], |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Unlimited) + })?; + assert_eq!(no_limit, all_rows_expected); + + let maxrows_less_than_actual = run_test(&[batch.clone()], |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Limited(1)) + })?; + assert_eq!(maxrows_less_than_actual, one_row_expected); + + let maxrows_more_than_actual = run_test(&[batch.clone()], |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Limited(5)) + })?; + assert_eq!(maxrows_more_than_actual, all_rows_expected); + + let maxrows_equals_actual = run_test(&[batch.clone()], |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Limited(3)) + })?; + assert_eq!(maxrows_equals_actual, all_rows_expected); + + let multi_batches = run_test( &[batch.clone(), batch.clone(), batch.clone()], - MaxRows::Limited(5), + |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Limited(5)) + }, )?; - assert_eq!(multi_batches_expected, multi_batches); + assert_eq!(multi_batches, multi_batches_expected); Ok(()) } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 0a6c8d4c36fc..b8594352b585 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -15,13 +15,21 @@ // specific language governing permissions and limitations // under the License. -use crate::print_format::PrintFormat; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::error::Result; use std::fmt::{Display, Formatter}; +use std::io::Write; +use std::pin::Pin; use std::str::FromStr; use std::time::Instant; +use crate::print_format::PrintFormat; + +use arrow::record_batch::RecordBatch; +use datafusion::common::DataFusionError; +use datafusion::error::Result; +use datafusion::physical_plan::RecordBatchStream; + +use futures::StreamExt; + #[derive(Debug, Clone, PartialEq, Copy)] pub enum MaxRows { /// show all rows in the output @@ -85,20 +93,70 @@ fn get_timing_info_str( } impl PrintOptions { - /// print the batches to stdout using the specified format + /// Print the batches to stdout using the specified format pub fn print_batches( &self, batches: &[RecordBatch], query_start_time: Instant, ) -> Result<()> { + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); + + self.format + .print_batches(&mut writer, batches, self.maxrows, true)?; + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); - // Elapsed time should not count time for printing batches - let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); + let timing_info = get_timing_info_str( + row_count, + if self.format == PrintFormat::Table { + self.maxrows + } else { + MaxRows::Unlimited + }, + query_start_time, + ); + + if !self.quiet { + writeln!(writer, "{timing_info}")?; + } + + Ok(()) + } + + /// Print the stream to stdout using the specified format + pub async fn print_stream( + &self, + mut stream: Pin>, + query_start_time: Instant, + ) -> Result<()> { + if self.format == PrintFormat::Table { + return Err(DataFusionError::External( + "PrintFormat::Table is not implemented".to_string().into(), + )); + }; + + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); + + let mut row_count = 0_usize; + let mut with_header = true; + + while let Some(Ok(batch)) = stream.next().await { + row_count += batch.num_rows(); + self.format.print_batches( + &mut writer, + &[batch], + MaxRows::Unlimited, + with_header, + )?; + with_header = false; + } - self.format.print_batches(batches, self.maxrows)?; + let timing_info = + get_timing_info_str(row_count, MaxRows::Unlimited, query_start_time); if !self.quiet { - println!("{timing_info}"); + writeln!(writer, "{timing_info}")?; } Ok(()) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 4a6ebeab09e1..5583991355c6 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -69,6 +69,7 @@ use arrow::{ use datafusion_common::{file_options::FileTypeWriterOptions, plan_err}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_plan::ExecutionPlan; use log::debug; use object_store::path::Path; @@ -507,6 +508,20 @@ fn get_projected_output_ordering( all_orderings } +/// Get output (un)boundedness information for the given `plan`. +pub fn is_plan_streaming(plan: &Arc) -> Result { + if plan.children().is_empty() { + plan.unbounded_output(&[]) + } else { + let children_unbounded_output = plan + .children() + .iter() + .map(is_plan_streaming) + .collect::>>(); + plan.unbounded_output(&children_unbounded_output?) + } +} + #[cfg(test)] mod tests { use arrow_array::cast::AsArray;