Skip to content

Commit

Permalink
Streaming CLI support (#8651)
Browse files Browse the repository at this point in the history
* Streaming CLI support

* Update Cargo.toml

* Remove duplications

* Clean up

* Stream test will be added

* Update print_format.rs

* Address feedback

* Final fix

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
berkaysynnada and ozankabak authored Dec 28, 2023
1 parent 1737d49 commit fba5cc0
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 161 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ arrow = { version = "49.0.0", features = ["prettyprint"] }
arrow-array = { version = "49.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "49.0.0", default-features = false }
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] }
arrow-ipc = { version = "49.0.0", default-features = false, features = ["lz4"] }
arrow-ord = { version = "49.0.0", default-features = false }
arrow-schema = { version = "49.0.0", default-features = false }
async-trait = "0.1.73"
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ datafusion = { path = "../datafusion/core", version = "34.0.0", features = ["avr
datafusion-common = { path = "../datafusion/common" }
dirs = "4.0.0"
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.8.0", features = ["aws", "gcp"] }
parking_lot = { version = "0.12" }
Expand Down
66 changes: 34 additions & 32 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

//! Execution functions
use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
use std::{fs::File, sync::Arc};

use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
Expand All @@ -26,21 +32,19 @@ use crate::{
},
print_options::{MaxRows, PrintOptions},
};
use datafusion::common::plan_datafusion_err;

use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::is_plan_streaming;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::{
datasource::listing::ListingTableUrl,
error::{DataFusionError, Result},
logical_expr::{CreateExternalTable, DdlStatement},
};
use datafusion::{logical_expr::LogicalPlan, prelude::SessionContext};

use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
use std::{fs::File, sync::Arc};
use url::Url;

/// run and execute SQL statements and commands, against a context with the given print options
Expand Down Expand Up @@ -125,8 +129,6 @@ pub async fn exec_from_repl(
)));
rl.load_history(".history").ok();

let mut print_options = print_options.clone();

loop {
match rl.readline("❯ ") {
Ok(line) if line.starts_with('\\') => {
Expand All @@ -138,9 +140,7 @@ pub async fn exec_from_repl(
Command::OutputFormat(subcommand) => {
if let Some(subcommand) = subcommand {
if let Ok(command) = subcommand.parse::<OutputFormat>() {
if let Err(e) =
command.execute(&mut print_options).await
{
if let Err(e) = command.execute(print_options).await {
eprintln!("{e}")
}
} else {
Expand All @@ -154,7 +154,7 @@ pub async fn exec_from_repl(
}
}
_ => {
if let Err(e) = cmd.execute(ctx, &mut print_options).await {
if let Err(e) = cmd.execute(ctx, print_options).await {
eprintln!("{e}")
}
}
Expand All @@ -165,7 +165,7 @@ pub async fn exec_from_repl(
}
Ok(line) => {
rl.add_history_entry(line.trim_end())?;
match exec_and_print(ctx, &print_options, line).await {
match exec_and_print(ctx, print_options, line).await {
Ok(_) => {}
Err(err) => eprintln!("{err}"),
}
Expand Down Expand Up @@ -198,7 +198,6 @@ async fn exec_and_print(
sql: String,
) -> Result<()> {
let now = Instant::now();

let sql = unescape_input(&sql)?;
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
Expand Down Expand Up @@ -227,18 +226,24 @@ async fn exec_and_print(
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
create_external_table(ctx, cmd).await?;
}

let df = ctx.execute_logical_plan(plan).await?;
let results = df.collect().await?;
let physical_plan = df.create_physical_plan().await?;

let print_options = if should_ignore_maxrows {
PrintOptions {
maxrows: MaxRows::Unlimited,
..print_options.clone()
}
if is_plan_streaming(&physical_plan)? {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
print_options.clone()
};
print_options.print_batches(&results, now)?;
let mut print_options = print_options.clone();
if should_ignore_maxrows {
print_options.maxrows = MaxRows::Unlimited;
}
if print_options.format == PrintFormat::Automatic {
print_options.format = PrintFormat::Table;
}
let results = collect(physical_plan, task_ctx.clone()).await?;
print_options.print_batches(&results, now)?;
}
}

Ok(())
Expand Down Expand Up @@ -272,10 +277,7 @@ async fn create_external_table(
.object_store_registry
.get_store(url)
.map_err(|_| {
DataFusionError::Execution(format!(
"Unsupported object store scheme: {}",
scheme
))
exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
})?
}
};
Expand Down
19 changes: 10 additions & 9 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use clap::Parser;
use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
Expand All @@ -29,12 +34,9 @@ use datafusion_cli::{
print_options::{MaxRows, PrintOptions},
DATAFUSION_CLI_VERSION,
};

use clap::Parser;
use mimalloc::MiMalloc;
use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down Expand Up @@ -111,7 +113,7 @@ struct Args {
)]
rc: Option<Vec<String>>,

#[clap(long, arg_enum, default_value_t = PrintFormat::Table)]
#[clap(long, arg_enum, default_value_t = PrintFormat::Automatic)]
format: PrintFormat,

#[clap(
Expand Down Expand Up @@ -331,9 +333,8 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {

#[cfg(test)]
mod tests {
use datafusion::assert_batches_eq;

use super::*;
use datafusion::assert_batches_eq;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
let result = extract_memory_pool_size(input);
Expand Down
Loading

0 comments on commit fba5cc0

Please sign in to comment.