diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 7d9ea97f7b5b..bc5b53c68ad0 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -17,32 +17,208 @@ //! This test demonstrates the DataFusion FIFO capabilities. //! -#[cfg(not(target_os = "windows"))] +#[cfg(target_family = "unix")] #[cfg(test)] mod unix_test { use arrow::array::Array; - use arrow::csv::ReaderBuilder; + use arrow::csv::{ReaderBuilder, WriterBuilder}; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion::test_util::register_unbounded_file_with_ordering; + use arrow_array::RecordBatch; + use arrow_schema::{SchemaRef, SortOptions}; + use async_trait::async_trait; + use datafusion::datasource::provider::TableProviderFactory; + use datafusion::datasource::TableProvider; + use datafusion::execution::context::SessionState; use datafusion::{ + physical_plan, prelude::{CsvReadOptions, SessionConfig, SessionContext}, test_util::{aggr_test_schema, arrow_test_data}, }; use datafusion_common::{exec_err, DataFusionError, Result}; + use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_execution::{SendableRecordBatchStream, TaskContext}; + use datafusion_expr::{CreateExternalTable, Expr, TableType}; + use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_physical_plan::common::AbortOnDropSingle; + use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; + use datafusion_physical_plan::metrics::MetricsSet; + use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; + use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; + use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use futures::StreamExt; - use itertools::enumerate; use nix::sys::stat; use nix::unistd; - use rstest::*; + use std::any::Any; + use std::collections::HashMap; + use std::fmt::Formatter; use std::fs::{File, OpenOptions}; use std::io::Write; use std::path::PathBuf; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; - use std::thread; - use std::thread::JoinHandle; - use std::time::{Duration, Instant}; use tempfile::TempDir; + use tokio::task::{spawn_blocking, JoinHandle}; + + #[derive(Default)] + struct FifoFactory {} + + #[async_trait] + impl TableProviderFactory for FifoFactory { + async fn create( + &self, + _state: &SessionState, + cmd: &CreateExternalTable, + ) -> Result> { + let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); + let location = cmd.location.clone(); + Ok(fifo_table(schema, location, None)) + } + } + + #[derive(Debug)] + struct FifoConfig { + schema: SchemaRef, + location: PathBuf, + sort: Option, + } + + struct FifoTable(Arc); + + #[async_trait] + impl TableProvider for FifoTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.0.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Temporary + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(StreamingTableExec::try_new( + self.0.schema.clone(), + vec![Arc::new(FifoRead(self.0.clone())) as _], + projection, + self.0.sort.clone(), + true, + )?)) + } + + async fn insert_into( + &self, + _state: &SessionState, + input: Arc, + _overwrite: bool, + ) -> Result> { + let ordering = match &self.0.sort { + Some(order) => Some(order.iter().map(|e| e.clone().into()).collect()), + None => None, + }; + + Ok(Arc::new(FileSinkExec::new( + input, + Arc::new(FifoWrite(self.0.clone())), + self.0.schema.clone(), + ordering, + ))) + } + } + + struct FifoRead(Arc); + + impl PartitionStream for FifoRead { + fn schema(&self) -> &SchemaRef { + &self.0.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + let config = self.0.clone(); + let schema = self.0.schema.clone(); + let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); + let tx = builder.tx(); + builder.spawn_blocking(move || { + let file = File::open(&config.location)?; + let reader = ReaderBuilder::new(config.schema.clone()).build(file)?; + for b in reader { + if tx.blocking_send(b.map_err(Into::into)).is_err() { + break; + } + } + Ok(()) + }); + builder.build() + } + } + + #[derive(Debug)] + struct FifoWrite(Arc); + + impl DisplayAs for FifoWrite { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{self:?}") + } + } + + #[async_trait] + impl DataSink for FifoWrite { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + _context: &Arc, + ) -> Result { + let config = self.0.clone(); + let (sender, mut receiver) = tokio::sync::mpsc::channel::(2); + // Note: FIFO Files support poll so this could use AsyncFd + let write = AbortOnDropSingle::new(spawn_blocking(move || { + let file = OpenOptions::new().write(true).open(&config.location)?; + let mut count = 0_u64; + let mut writer = WriterBuilder::new().with_header(false).build(file); + while let Some(batch) = receiver.blocking_recv() { + count += batch.num_rows() as u64; + writer.write(&batch)?; + } + Ok(count) + })); + + while let Some(b) = data.next().await.transpose()? { + if sender.send(b).await.is_err() { + break; + } + } + drop(sender); + write.await.unwrap() + } + } + + /// Makes a TableProvider for a fifo file + fn fifo_table( + schema: SchemaRef, + path: impl Into, + sort: Option, + ) -> Arc { + Arc::new(FifoTable(Arc::new(FifoConfig { + schema, + sort, + location: path.into(), + }))) + } // ! For the sake of the test, do not alter the numbers. ! // Session batch size @@ -64,48 +240,20 @@ mod unix_test { } } - fn write_to_fifo( - mut file: &File, - line: &str, - ref_time: Instant, - broken_pipe_timeout: Duration, - ) -> Result<()> { - // We need to handle broken pipe error until the reader is ready. This - // is why we use a timeout to limit the wait duration for the reader. - // If the error is different than broken pipe, we fail immediately. - while let Err(e) = file.write_all(line.as_bytes()) { - if e.raw_os_error().unwrap() == 32 { - let interval = Instant::now().duration_since(ref_time); - if interval < broken_pipe_timeout { - thread::sleep(Duration::from_millis(100)); - continue; - } - } - return exec_err!("{}", e); - } - Ok(()) - } - // This test provides a relatively realistic end-to-end scenario where // we swap join sides to accommodate a FIFO source. - #[rstest] - #[timeout(std::time::Duration::from_secs(30))] - #[tokio::test(flavor = "multi_thread", worker_threads = 8)] - async fn unbounded_file_with_swapped_join( - #[values(true, false)] unbounded_file: bool, - ) -> Result<()> { + #[tokio::test] + async fn unbounded_file_with_swapped_join() -> Result<()> { // Create session context let config = SessionConfig::new() .with_batch_size(TEST_BATCH_SIZE) .with_collect_statistics(false) .with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); - // To make unbounded deterministic - let waiting = Arc::new(AtomicBool::new(unbounded_file)); // Create a new temporary FIFO file let tmp_dir = TempDir::new()?; - let fifo_path = - create_fifo_file(&tmp_dir, &format!("fifo_{unbounded_file:?}.csv"))?; + let fifo_path = create_fifo_file(&tmp_dir, "fifo_file.csv")?; // Execution can calculated at least one RecordBatch after the number of // "joinable_lines_length" lines are read. let joinable_lines_length = @@ -124,28 +272,18 @@ mod unix_test { .map(|(a1, a2)| format!("{a1},{a2}\n")) .collect::>(); // Create writing threads for the left and right FIFO files - let task = create_writing_thread( - fifo_path.clone(), - "a1,a2\n".to_owned(), - lines, - waiting.clone(), - joinable_lines_length, - ); + let task = create_writing_thread(fifo_path.clone(), lines); // Data Schema let schema = Arc::new(Schema::new(vec![ Field::new("a1", DataType::Utf8, false), Field::new("a2", DataType::UInt32, false), ])); + // Create a file with bounded or unbounded flag. - ctx.register_csv( - "left", - fifo_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new() - .schema(schema.as_ref()) - .mark_infinite(unbounded_file), - ) - .await?; + let provider = fifo_table(schema, fifo_path, None); + ctx.register_table("left", provider).unwrap(); + // Register right table let schema = aggr_test_schema(); let test_data = arrow_test_data(); @@ -158,10 +296,8 @@ mod unix_test { // Execute the query let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 JOIN right as t2 ON t1.a1 = t2.c1").await?; let mut stream = df.execute_stream().await?; - while (stream.next().await).is_some() { - waiting.store(false, Ordering::SeqCst); - } - task.join().unwrap(); + while (stream.next().await).is_some() {} + task.await.unwrap(); Ok(()) } @@ -172,39 +308,20 @@ mod unix_test { Equal, } - fn create_writing_thread( - file_path: PathBuf, - header: String, - lines: Vec, - waiting_lock: Arc, - wait_until: usize, - ) -> JoinHandle<()> { - // Timeout for a long period of BrokenPipe error - let broken_pipe_timeout = Duration::from_secs(10); - // Spawn a new thread to write to the FIFO file - thread::spawn(move || { - let file = OpenOptions::new().write(true).open(file_path).unwrap(); - // Reference time to use when deciding to fail the test - let execution_start = Instant::now(); - write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap(); - for (cnt, line) in enumerate(lines) { - while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until { - thread::sleep(Duration::from_millis(50)); - } - write_to_fifo(&file, &line, execution_start, broken_pipe_timeout) - .unwrap(); + fn create_writing_thread(file_path: PathBuf, lines: Vec) -> JoinHandle<()> { + spawn_blocking(move || { + let mut file = OpenOptions::new().write(true).open(file_path).unwrap(); + for line in &lines { + file.write_all(line.as_bytes()).unwrap() } - drop(file); + file.flush().unwrap(); }) } // This test provides a relatively realistic end-to-end scenario where // we change the join into a [SymmetricHashJoin] to accommodate two // unbounded (FIFO) sources. - #[rstest] - #[timeout(std::time::Duration::from_secs(30))] - #[tokio::test(flavor = "multi_thread")] - #[ignore] + #[tokio::test] async fn unbounded_file_with_symmetric_join() -> Result<()> { // Create session context let config = SessionConfig::new() @@ -212,8 +329,6 @@ mod unix_test { .set_bool("datafusion.execution.coalesce_batches", false) .with_target_partitions(1); let ctx = SessionContext::new_with_config(config); - // Tasks - let mut tasks: Vec> = vec![]; // Join filter let a1_iter = 0..TEST_DATA_SIZE; @@ -230,78 +345,54 @@ mod unix_test { let left_fifo = create_fifo_file(&tmp_dir, "left.csv")?; // Create a FIFO file for the right input source. let right_fifo = create_fifo_file(&tmp_dir, "right.csv")?; - // Create a mutex for tracking if the right input source is waiting for data. - let waiting = Arc::new(AtomicBool::new(true)); // Create writing threads for the left and right FIFO files - tasks.push(create_writing_thread( - left_fifo.clone(), - "a1,a2\n".to_owned(), - lines.clone(), - waiting.clone(), - TEST_BATCH_SIZE, - )); - tasks.push(create_writing_thread( - right_fifo.clone(), - "a1,a2\n".to_owned(), - lines.clone(), - waiting.clone(), - TEST_BATCH_SIZE, - )); + let tasks = vec![ + create_writing_thread(left_fifo.clone(), lines.clone()), + create_writing_thread(right_fifo.clone(), lines.clone()), + ]; // Create schema let schema = Arc::new(Schema::new(vec![ Field::new("a1", DataType::UInt32, false), Field::new("a2", DataType::UInt32, false), ])); + // Specify the ordering: - let file_sort_order = vec![[datafusion_expr::col("a1")] - .into_iter() - .map(|e| { - let ascending = true; - let nulls_first = false; - e.sort(ascending, nulls_first) - }) - .collect::>()]; + let order = Some(vec![PhysicalSortExpr { + expr: physical_plan::expressions::col("a1", schema.as_ref())?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]); + // Set unbounded sorted files read configuration - register_unbounded_file_with_ordering( - &ctx, - schema.clone(), - &left_fifo, - "left", - file_sort_order.clone(), - true, - ) - .await?; - register_unbounded_file_with_ordering( - &ctx, - schema, - &right_fifo, - "right", - file_sort_order, - true, - ) - .await?; + let provider = fifo_table(schema.clone(), left_fifo, order.clone()); + ctx.register_table("left", provider)?; + + let provider = fifo_table(schema.clone(), right_fifo, order); + ctx.register_table("right", provider)?; + // Execute the query, with no matching rows. (since key is modulus 10) let df = ctx .sql( "SELECT - t1.a1, - t1.a2, - t2.a1, - t2.a2 - FROM - left as t1 FULL - JOIN right as t2 ON t1.a2 = t2.a2 - AND t1.a1 > t2.a1 + 4 - AND t1.a1 < t2.a1 + 9", + t1.a1, + t1.a2, + t2.a1, + t2.a2 + FROM + left as t1 FULL + JOIN right as t2 ON t1.a2 = t2.a2 + AND t1.a1 > t2.a1 + 4 + AND t1.a1 < t2.a1 + 9", ) .await?; let mut stream = df.execute_stream().await?; let mut operations = vec![]; // Partial. while let Some(Ok(batch)) = stream.next().await { - waiting.store(false, Ordering::SeqCst); let left_unmatched = batch.column(2).null_count(); let right_unmatched = batch.column(0).null_count(); let op = if left_unmatched == 0 && right_unmatched == 0 { @@ -313,7 +404,8 @@ mod unix_test { }; operations.push(op); } - tasks.into_iter().for_each(|jh| jh.join().unwrap()); + futures::future::try_join_all(tasks).await.unwrap(); + // The SymmetricHashJoin executor produces FULL join results at every // pruning, which happens before it reaches the end of input and more // than once. In this test, we feed partially joinable data to both @@ -337,12 +429,15 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] async fn test_sql_insert_into_fifo() -> Result<()> { - // To make unbounded deterministic - let waiting = Arc::new(AtomicBool::new(true)); - let waiting_thread = waiting.clone(); // create local execution context + let runtime = Arc::new(RuntimeEnv::default()); let config = SessionConfig::new().with_batch_size(TEST_BATCH_SIZE); - let ctx = SessionContext::new_with_config(config); + let mut state = SessionState::new_with_config_rt(config, runtime); + let mut factories = HashMap::with_capacity(1); + factories.insert("CSV".to_string(), Arc::new(FifoFactory::default()) as _); + *state.table_factories_mut() = factories; + let ctx = SessionContext::new_with_state(state); + // Create a new temporary FIFO file let tmp_dir = TempDir::new()?; let source_fifo_path = create_fifo_file(&tmp_dir, "source.csv")?; @@ -356,20 +451,18 @@ mod unix_test { // thread. This approach ensures that the pipeline remains unbroken. tasks.push(create_writing_thread( source_fifo_path_thread, - "a1,a2\n".to_owned(), (0..TEST_DATA_SIZE) .map(|_| "a,1\n".to_string()) .collect::>(), - waiting, - TEST_BATCH_SIZE, )); // Create a new temporary FIFO file let sink_fifo_path = create_fifo_file(&tmp_dir, "sink.csv")?; // Prevent move let (sink_fifo_path_thread, sink_display_fifo_path) = (sink_fifo_path.clone(), sink_fifo_path.display()); + // Spawn a new thread to read sink EXTERNAL TABLE. - tasks.push(thread::spawn(move || { + tasks.push(spawn_blocking(move || { let file = File::open(sink_fifo_path_thread).unwrap(); let schema = Arc::new(Schema::new(vec![ Field::new("a1", DataType::Utf8, false), @@ -377,14 +470,18 @@ mod unix_test { ])); let mut reader = ReaderBuilder::new(schema) - .with_header(true) .with_batch_size(TEST_BATCH_SIZE) .build(file) .map_err(|e| DataFusionError::Internal(e.to_string())) .unwrap(); - while let Some(Ok(_)) = reader.next() { - waiting_thread.store(false, Ordering::SeqCst); + let mut remaining = TEST_DATA_SIZE; + + while let Some(Ok(b)) = reader.next() { + remaining = remaining.checked_sub(b.num_rows()).unwrap(); + if remaining == 0 { + break; + } } })); // register second csv file with the SQL (create an empty file if not found) @@ -394,8 +491,6 @@ mod unix_test { a2 INT NOT NULL ) STORED AS CSV - WITH HEADER ROW - OPTIONS ('UNBOUNDED' 'TRUE') LOCATION '{source_display_fifo_path}'" )) .await?; @@ -407,20 +502,17 @@ mod unix_test { a2 INT NOT NULL ) STORED AS CSV - WITH HEADER ROW - OPTIONS ('UNBOUNDED' 'TRUE') LOCATION '{sink_display_fifo_path}'" )) .await?; let df = ctx - .sql( - "INSERT INTO sink_table - SELECT a1, a2 FROM source_table", - ) + .sql("INSERT INTO sink_table SELECT a1, a2 FROM source_table") .await?; - df.collect().await?; - tasks.into_iter().for_each(|jh| jh.join().unwrap()); + + // Start execution + let _ = df.collect().await.unwrap(); + futures::future::try_join_all(tasks).await.unwrap(); Ok(()) } }