diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 5f2084bc80a85..684f416f771a4 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -34,10 +34,10 @@ use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; -use super::write::orchestration::{stateless_append_all, stateless_multipart_put}; +use super::write::orchestration::stateless_multipart_put; use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode}; +use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, }; @@ -465,11 +465,7 @@ impl DisplayAs for CsvSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "CsvSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "CsvSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -481,55 +477,6 @@ impl CsvSink { fn new(config: FileSinkConfig) -> Self { Self { config } } - - async fn append_all( - &self, - data: SendableRecordBatchStream, - context: &Arc, - ) -> Result { - if !self.config.table_partition_cols.is_empty() { - return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into())); - } - let writer_options = self.config.file_type_writer_options.try_into_csv()?; - let (builder, compression) = - (&writer_options.writer_options, &writer_options.compression); - let compression = FileCompressionType::from(*compression); - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - let file_groups = &self.config.file_groups; - - let builder_clone = builder.clone(); - let options_clone = writer_options.clone(); - let get_serializer = move |file_size| { - let inner_clone = builder_clone.clone(); - // In append mode, consider has_header flag only when file is empty (at the start). - // For other modes, use has_header flag as is. - let serializer: Box = Box::new(if file_size > 0 { - CsvSerializer::new() - .with_builder(inner_clone) - .with_header(false) - } else { - CsvSerializer::new() - .with_builder(inner_clone) - .with_header(options_clone.writer_options.header()) - }); - serializer - }; - - stateless_append_all( - data, - context, - object_store, - file_groups, - self.config.unbounded_input, - compression, - Box::new(get_serializer), - ) - .await - } - async fn multipartput_all( &self, data: SendableRecordBatchStream, @@ -577,19 +524,8 @@ impl DataSink for CsvSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - match self.config.writer_mode { - FileWriterMode::Append => { - let total_count = self.append_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::PutMultipart => { - let total_count = self.multipartput_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::Put => { - return not_impl_err!("FileWriterMode::Put is not supported yet!") - } - } + let total_count = self.multipartput_all(data, context).await?; + Ok(total_count) } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 8d62d0a858aca..9893a1db45de9 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; -use super::write::orchestration::{stateless_append_all, stateless_multipart_put}; +use super::write::orchestration::stateless_multipart_put; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode}; +use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec}; use crate::error::Result; @@ -245,11 +245,7 @@ impl DisplayAs for JsonSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "JsonSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "JsonSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -268,40 +264,6 @@ impl JsonSink { &self.config } - async fn append_all( - &self, - data: SendableRecordBatchStream, - context: &Arc, - ) -> Result { - if !self.config.table_partition_cols.is_empty() { - return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into())); - } - - let writer_options = self.config.file_type_writer_options.try_into_json()?; - let compression = &writer_options.compression; - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - let file_groups = &self.config.file_groups; - - let get_serializer = move |_| { - let serializer: Box = Box::new(JsonSerializer::new()); - serializer - }; - - stateless_append_all( - data, - context, - object_store, - file_groups, - self.config.unbounded_input, - (*compression).into(), - Box::new(get_serializer), - ) - .await - } - async fn multipartput_all( &self, data: SendableRecordBatchStream, @@ -342,19 +304,8 @@ impl DataSink for JsonSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - match self.config.writer_mode { - FileWriterMode::Append => { - let total_count = self.append_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::PutMultipart => { - let total_count = self.multipartput_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::Put => { - return not_impl_err!("FileWriterMode::Put is not supported yet!") - } - } + let total_count = self.multipartput_all(data, context).await?; + Ok(total_count) } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 41a70e6d2f8fb..4c7557a4a9c06 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -28,7 +28,7 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; -use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl}; +use crate::datasource::listing::ListingTableUrl; use crate::datasource::{ file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat}, listing::ListingOptions, @@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for CsvReadOptions<'a> { @@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendToFile, } } @@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } /// Options that control the reading of Parquet files. @@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> { pub schema: Option<&'a Schema>, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for ParquetReadOptions<'a> { @@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> { skip_metadata: None, schema: None, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendNewFiles, } } } @@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } /// Options that control the reading of ARROW files. @@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for NdJsonReadOptions<'a> { @@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendToFile, } } } @@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } #[async_trait] @@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) .with_infinite_source(self.infinite) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( @@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( @@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { .with_table_partition_cols(self.table_partition_cols.clone()) .with_infinite_source(self.infinite) .with_file_sort_order(self.file_sort_order.clone()) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 2cba474e559e7..c4d05adfc6bc3 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -40,11 +40,12 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use bytes::{BufMut, BytesMut}; -use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; +use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; +use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, @@ -55,7 +56,7 @@ use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use super::write::demux::start_demuxer_task; -use super::write::{create_writer, AbortableWrite, FileWriterMode}; +use super::write::{create_writer, AbortableWrite}; use super::{FileFormat, FileScanConfig}; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, @@ -64,7 +65,7 @@ use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; use crate::datasource::physical_plan::{ - FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter, + FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter, }; use crate::error::Result; use crate::execution::context::SessionState; @@ -596,11 +597,7 @@ impl DisplayAs for ParquetSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "ParquetSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "ParquetSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -642,36 +639,23 @@ impl ParquetSink { /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized async fn create_async_arrow_writer( &self, - file_meta: FileMeta, + location: &Path, object_store: Arc, parquet_props: WriterProperties, ) -> Result< AsyncArrowWriter>, > { - let object = &file_meta.object_meta; - match self.config.writer_mode { - FileWriterMode::Append => { - plan_err!( - "Appending to Parquet files is not supported by the file format!" - ) - } - FileWriterMode::Put => { - not_impl_err!("FileWriterMode::Put is not implemented for ParquetSink") - } - FileWriterMode::PutMultipart => { - let (_, multipart_writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AsyncArrowWriter::try_new( - multipart_writer, - self.get_writer_schema(), - 10485760, - Some(parquet_props), - )?; - Ok(writer) - } - } + let (_, multipart_writer) = object_store + .put_multipart(location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AsyncArrowWriter::try_new( + multipart_writer, + self.get_writer_schema(), + 10485760, + Some(parquet_props), + )?; + Ok(writer) } } @@ -730,13 +714,7 @@ impl DataSink for ParquetSink { if !allow_single_file_parallelism { let mut writer = self .create_async_arrow_writer( - ObjectMeta { - location: path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - } - .into(), + &path, object_store.clone(), parquet_props.clone(), ) @@ -752,17 +730,10 @@ impl DataSink for ParquetSink { }); } else { let writer = create_writer( - FileWriterMode::PutMultipart, // Parquet files as a whole are never compressed, since they // manage compressed blocks themselves. FileCompressionType::UNCOMPRESSED, - ObjectMeta { - location: path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - } - .into(), + &path, object_store.clone(), ) .await?; diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index 770c7a49c326d..cfcdbd8c464ec 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -19,128 +19,32 @@ //! write support for the various file formats use std::io::Error; -use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::physical_plan::FileMeta; use crate::error::Result; use arrow_array::RecordBatch; -use datafusion_common::{exec_err, DataFusionError}; +use datafusion_common::DataFusionError; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::ready; -use futures::FutureExt; use object_store::path::Path; -use object_store::{MultipartId, ObjectMeta, ObjectStore}; +use object_store::{MultipartId, ObjectStore}; use tokio::io::AsyncWrite; pub(crate) mod demux; pub(crate) mod orchestration; -/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. -/// It is specifically designed for the `object_store` crate's `put` method and sends -/// whole bytes at once when the buffer is flushed. -pub struct AsyncPutWriter { - /// Object metadata - object_meta: ObjectMeta, - /// A shared reference to the object store - store: Arc, - /// A buffer that stores the bytes to be sent - current_buffer: Vec, - /// Used for async handling in flush method - inner_state: AsyncPutState, -} - -impl AsyncPutWriter { - /// Constructor for the `AsyncPutWriter` object - pub fn new(object_meta: ObjectMeta, store: Arc) -> Self { - Self { - object_meta, - store, - current_buffer: vec![], - // The writer starts out in buffering mode - inner_state: AsyncPutState::Buffer, - } - } - - /// Separate implementation function that unpins the [`AsyncPutWriter`] so - /// that partial borrows work correctly - fn poll_shutdown_inner( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - loop { - match &mut self.inner_state { - AsyncPutState::Buffer => { - // Convert the current buffer to bytes and take ownership of it - let bytes = Bytes::from(mem::take(&mut self.current_buffer)); - // Set the inner state to Put variant with the bytes - self.inner_state = AsyncPutState::Put { bytes } - } - AsyncPutState::Put { bytes } => { - // Send the bytes to the object store's put method - return Poll::Ready( - ready!(self - .store - .put(&self.object_meta.location, bytes.clone()) - .poll_unpin(cx)) - .map_err(Error::from), - ); - } - } - } - } -} - -/// An enum that represents the inner state of AsyncPut -enum AsyncPutState { - /// Building Bytes struct in this state - Buffer, - /// Data in the buffer is being sent to the object store - Put { bytes: Bytes }, -} - -impl AsyncWrite for AsyncPutWriter { - // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct - fn poll_write( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // Extend the current buffer with the incoming buffer - self.current_buffer.extend_from_slice(buf); - // Return a ready poll with the length of the incoming buffer - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - // Return a ready poll with an empty result - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - // Call the poll_shutdown_inner method to handle the actual sending of data to the object store - self.poll_shutdown_inner(cx) - } -} - /// Stores data needed during abortion of MultiPart writers +#[derive(Clone)] pub(crate) struct MultiPart { /// A shared reference to the object store store: Arc, @@ -163,45 +67,28 @@ impl MultiPart { } } -pub(crate) enum AbortMode { - Put, - Append, - MultiPart(MultiPart), -} - /// A wrapper struct with abort method and writer pub(crate) struct AbortableWrite { writer: W, - mode: AbortMode, + multipart: MultiPart, } impl AbortableWrite { /// Create a new `AbortableWrite` instance with the given writer, and write mode. - pub(crate) fn new(writer: W, mode: AbortMode) -> Self { - Self { writer, mode } + pub(crate) fn new(writer: W, multipart: MultiPart) -> Self { + Self { writer, multipart } } /// handling of abort for different write modes pub(crate) fn abort_writer(&self) -> Result>> { - match &self.mode { - AbortMode::Put => Ok(async { Ok(()) }.boxed()), - AbortMode::Append => exec_err!("Cannot abort in append mode"), - AbortMode::MultiPart(MultiPart { - store, - multipart_id, - location, - }) => { - let location = location.clone(); - let multipart_id = multipart_id.clone(); - let store = store.clone(); - Ok(Box::pin(async move { - store - .abort_multipart(&location, &multipart_id) - .await - .map_err(DataFusionError::ObjectStore) - })) - } - } + let multi = self.multipart.clone(); + Ok(Box::pin(async move { + multi + .store + .abort_multipart(&multi.location, &multi.multipart_id) + .await + .map_err(DataFusionError::ObjectStore) + })) } } @@ -229,16 +116,6 @@ impl AsyncWrite for AbortableWrite { } } -/// An enum that defines different file writer modes. -#[derive(Debug, Clone, Copy)] -pub enum FileWriterMode { - /// Data is appended to an existing file. - Append, - /// Data is written to a new file. - Put, - /// Data is written to a new file in multiple parts. - PutMultipart, -} /// A trait that defines the methods required for a RecordBatch serializer. #[async_trait] pub trait BatchSerializer: Unpin + Send { @@ -255,51 +132,16 @@ pub trait BatchSerializer: Unpin + Send { /// Returns an [`AbortableWrite`] which writes to the given object store location /// with the specified compression pub(crate) async fn create_writer( - writer_mode: FileWriterMode, file_compression_type: FileCompressionType, - file_meta: FileMeta, + location: &Path, object_store: Arc, ) -> Result>> { - let object = &file_meta.object_meta; - match writer_mode { - // If the mode is append, call the store's append method and return wrapped in - // a boxed trait object. - FileWriterMode::Append => { - let writer = object_store - .append(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Append, - ); - Ok(writer) - } - // If the mode is put, create a new AsyncPut writer and return it wrapped in - // a boxed trait object - FileWriterMode::Put => { - let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Put, - ); - Ok(writer) - } - // If the mode is put multipart, call the store's put_multipart method and - // return the writer wrapped in a boxed trait object. - FileWriterMode::PutMultipart => { - let (multipart_id, writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::MultiPart(MultiPart::new( - object_store, - multipart_id, - object.location.clone(), - )), - )) - } - } + let (multipart_id, writer) = object_store + .put_multipart(location) + .await + .map_err(DataFusionError::ObjectStore)?; + Ok(AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + MultiPart::new(object_store, multipart_id, location.clone()), + )) } diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index f84baa9ac2252..2ae6b70ed1c5a 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::FileSinkConfig; use crate::error::Result; use crate::physical_plan::SendableRecordBatchStream; @@ -34,17 +33,13 @@ use datafusion_common::DataFusionError; use bytes::Bytes; use datafusion_execution::TaskContext; -use futures::StreamExt; - -use object_store::{ObjectMeta, ObjectStore}; - use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver}; use tokio::task::{JoinHandle, JoinSet}; use tokio::try_join; use super::demux::start_demuxer_task; -use super::{create_writer, AbortableWrite, BatchSerializer, FileWriterMode}; +use super::{create_writer, AbortableWrite, BatchSerializer}; type WriterType = AbortableWrite>; type SerializerType = Box; @@ -274,21 +269,9 @@ pub(crate) async fn stateless_multipart_put( stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) .await }); - while let Some((output_location, rb_stream)) = file_stream_rx.recv().await { + while let Some((location, rb_stream)) = file_stream_rx.recv().await { let serializer = get_serializer(); - let object_meta = ObjectMeta { - location: output_location, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - FileWriterMode::PutMultipart, - compression, - object_meta.into(), - object_store.clone(), - ) - .await?; + let writer = create_writer(compression, &location, object_store.clone()).await?; tx_file_bundle .send((rb_stream, serializer, writer)) @@ -325,91 +308,3 @@ pub(crate) async fn stateless_multipart_put( Ok(total_count) } - -/// Orchestrates append_all for any statelessly serialized file type. Appends to all files provided -/// in a round robin fashion. -pub(crate) async fn stateless_append_all( - mut data: SendableRecordBatchStream, - context: &Arc, - object_store: Arc, - file_groups: &Vec, - unbounded_input: bool, - compression: FileCompressionType, - get_serializer: Box Box + Send>, -) -> Result { - let rb_buffer_size = &context - .session_config() - .options() - .execution - .max_buffered_batches_per_output_file; - - let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(file_groups.len()); - let mut send_channels = vec![]; - for file_group in file_groups { - let serializer = get_serializer(file_group.object_meta.size); - - let file = file_group.clone(); - let writer = create_writer( - FileWriterMode::Append, - compression, - file.object_meta.clone().into(), - object_store.clone(), - ) - .await?; - - let (tx, rx) = tokio::sync::mpsc::channel(rb_buffer_size / 2); - send_channels.push(tx); - tx_file_bundle - .send((rx, serializer, writer)) - .await - .map_err(|_| { - DataFusionError::Internal( - "Writer receive file bundle channel closed unexpectedly!".into(), - ) - })?; - } - - let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel(); - let write_coordinater_task = tokio::spawn(async move { - stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) - .await - }); - - // Append to file groups in round robin - let mut next_file_idx = 0; - while let Some(rb) = data.next().await.transpose()? { - send_channels[next_file_idx].send(rb).await.map_err(|_| { - DataFusionError::Internal( - "Recordbatch file append stream closed unexpectedly!".into(), - ) - })?; - next_file_idx = (next_file_idx + 1) % send_channels.len(); - if unbounded_input { - tokio::task::yield_now().await; - } - } - // Signal to the write coordinater that no more files are coming - drop(tx_file_bundle); - drop(send_channels); - - let total_count = rx_row_cnt.await.map_err(|_| { - DataFusionError::Internal( - "Did not receieve row count from write coordinater".into(), - ) - })?; - - match try_join!(write_coordinater_task) { - Ok(r1) => { - r1.0?; - } - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - - Ok(total_count) -} diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 8b0f021f02777..aa2e20164b5e1 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -31,9 +31,7 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; -pub use table::{ - ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode, -}; +pub use table::{ListingOptions, ListingTable, ListingTableConfig}; /// Stream of files get listed from object store pub type PartitionedFileStream = diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index c22eb58e88fa0..b71dff6d5e95d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -214,33 +214,6 @@ impl ListingTableConfig { } } -#[derive(Debug, Clone)] -///controls how new data should be inserted to a ListingTable -pub enum ListingTableInsertMode { - ///Data should be appended to an existing file - AppendToFile, - ///Data is appended as new files in existing TablePaths - AppendNewFiles, - ///Throw an error if insert into is attempted on this table - Error, -} - -impl FromStr for ListingTableInsertMode { - type Err = DataFusionError; - fn from_str(s: &str) -> Result { - let s_lower = s.to_lowercase(); - match s_lower.as_str() { - "append_to_file" => Ok(ListingTableInsertMode::AppendToFile), - "append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles), - "error" => Ok(ListingTableInsertMode::Error), - _ => plan_err!( - "Unknown or unsupported insert mode {s}. Supported options are \ - append_to_file, append_new_files, and error." - ), - } - } -} - /// Options for creating a [`ListingTable`] #[derive(Clone, Debug)] pub struct ListingOptions { @@ -279,8 +252,6 @@ pub struct ListingOptions { /// In order to support infinite inputs, DataFusion may adjust query /// plans (e.g. joins) to run the given query in full pipelining mode. pub infinite_source: bool, - /// This setting controls how inserts to this table should be handled - pub insert_mode: ListingTableInsertMode, /// This setting when true indicates that the table is backed by a single file. /// Any inserts to the table may only append to this existing file. pub single_file: bool, @@ -305,7 +276,6 @@ impl ListingOptions { target_partitions: 1, file_sort_order: vec![], infinite_source: false, - insert_mode: ListingTableInsertMode::AppendToFile, single_file: false, file_type_write_options: None, } @@ -476,12 +446,6 @@ impl ListingOptions { self } - /// Configure how insertions to this table should be handled. - pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } - /// Configure if this table is backed by a sigle file pub fn with_single_file(mut self, single_file: bool) -> Self { self.single_file = single_file; @@ -820,31 +784,6 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; - //if we are writing a single output_partition to a table backed by a single file - //we can append to that file. Otherwise, we can write new files into the directory - //adding new files to the listing table in order to insert to the table. - let input_partitions = input.output_partitioning().partition_count(); - let writer_mode = match self.options.insert_mode { - ListingTableInsertMode::AppendToFile => { - if input_partitions > file_groups.len() { - return plan_err!( - "Cannot append {input_partitions} partitions to {} files!", - file_groups.len() - ); - } - - crate::datasource::file_format::write::FileWriterMode::Append - } - ListingTableInsertMode::AppendNewFiles => { - crate::datasource::file_format::write::FileWriterMode::PutMultipart - } - ListingTableInsertMode::Error => { - return plan_err!( - "Invalid plan attempting write to table with TableWriteMode::Error!" - ); - } - }; - let file_format = self.options().format.as_ref(); let file_type_writer_options = match &self.options().file_type_write_options { @@ -862,7 +801,6 @@ impl TableProvider for ListingTable { file_groups, output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), - writer_mode, // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT // queries. Thus, we can check if the plan is streaming to ensure file sink input is // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now` @@ -877,14 +815,6 @@ impl TableProvider for ListingTable { let unsorted: Vec> = vec![]; let order_requirements = if self.options().file_sort_order != unsorted { - if matches!( - self.options().insert_mode, - ListingTableInsertMode::AppendToFile - ) { - return plan_err!( - "Cannot insert into a sorted ListingTable with mode append!" - ); - } // Multiple sort orders in outer vec are equivalent, so we pass only the first one let ordering = self .try_create_output_ordering()? @@ -1003,7 +933,7 @@ mod tests { use crate::prelude::*; use crate::{ assert_batches_eq, - datasource::file_format::{avro::AvroFormat, file_compression_type::FileTypeExt}, + datasource::file_format::avro::AvroFormat, execution::options::ReadOptions, logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, @@ -1567,17 +1497,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_json_file() -> Result<()> { - helper_test_insert_into_append_to_existing_files( - FileType::JSON, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await?; - Ok(()) - } - #[tokio::test] async fn test_insert_into_append_new_json_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); @@ -1596,17 +1515,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_csv_file() -> Result<()> { - helper_test_insert_into_append_to_existing_files( - FileType::CSV, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await?; - Ok(()) - } - #[tokio::test] async fn test_insert_into_append_new_csv_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); @@ -1666,7 +1574,7 @@ mod tests { helper_test_insert_into_sql( "csv", FileCompressionType::UNCOMPRESSED, - "OPTIONS (insert_mode 'append_new_files')", + "", None, ) .await?; @@ -1678,8 +1586,7 @@ mod tests { helper_test_insert_into_sql( "csv", FileCompressionType::UNCOMPRESSED, - "WITH HEADER ROW \ - OPTIONS (insert_mode 'append_new_files')", + "WITH HEADER ROW", None, ) .await?; @@ -1691,7 +1598,7 @@ mod tests { helper_test_insert_into_sql( "json", FileCompressionType::UNCOMPRESSED, - "OPTIONS (insert_mode 'append_new_files')", + "", None, ) .await?; @@ -1879,211 +1786,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> { - let maybe_err = helper_test_insert_into_append_to_existing_files( - FileType::PARQUET, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await; - let _err = - maybe_err.expect_err("Appending to existing parquet file did not fail!"); - Ok(()) - } - - fn load_empty_schema_table( - schema: SchemaRef, - temp_path: &str, - insert_mode: ListingTableInsertMode, - file_format: Arc, - ) -> Result> { - File::create(temp_path)?; - let table_path = ListingTableUrl::parse(temp_path).unwrap(); - - let listing_options = - ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode); - - let config = ListingTableConfig::new(table_path) - .with_listing_options(listing_options) - .with_schema(schema); - - let table = ListingTable::try_new(config)?; - Ok(Arc::new(table)) - } - - /// Logic of testing inserting into listing table by Appending to existing files - /// is the same for all formats/options which support this. This helper allows - /// passing different options to execute the same test with different settings. - async fn helper_test_insert_into_append_to_existing_files( - file_type: FileType, - file_compression_type: FileCompressionType, - session_config_map: Option>, - ) -> Result<()> { - // Create the initial context, schema, and batch. - let session_ctx = match session_config_map { - Some(cfg) => { - let config = SessionConfig::from_string_hash_map(cfg)?; - SessionContext::new_with_config(config) - } - None => SessionContext::new(), - }; - // Create a new schema with one field called "a" of type Int32 - let schema = Arc::new(Schema::new(vec![Field::new( - "column1", - DataType::Int32, - false, - )])); - - // Create a new batch of data to insert into the table - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], - )?; - - // Filename with extension - let filename = format!( - "path{}", - file_type - .to_owned() - .get_ext_with_compression(file_compression_type) - .unwrap() - ); - - // Create a temporary directory and a CSV file within it. - let tmp_dir = TempDir::new()?; - let path = tmp_dir.path().join(filename); - - let file_format: Arc = match file_type { - FileType::CSV => Arc::new( - CsvFormat::default().with_file_compression_type(file_compression_type), - ), - FileType::JSON => Arc::new( - JsonFormat::default().with_file_compression_type(file_compression_type), - ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), - FileType::AVRO => Arc::new(AvroFormat {}), - FileType::ARROW => Arc::new(ArrowFormat {}), - }; - - let initial_table = load_empty_schema_table( - schema.clone(), - path.to_str().unwrap(), - ListingTableInsertMode::AppendToFile, - file_format, - )?; - session_ctx.register_table("t", initial_table)?; - // Create and register the source table with the provided schema and inserted data - let source_table = Arc::new(MemTable::try_new( - schema.clone(), - vec![vec![batch.clone(), batch.clone()]], - )?); - session_ctx.register_table("source", source_table.clone())?; - // Convert the source table into a provider so that it can be used in a query - let source = provider_as_source(source_table); - // Create a table scan logical plan to read from the source table - let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?; - // Create an insert plan to insert the source data into the initial table - let insert_into_table = - LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; - // Create a physical plan from the insert plan - let plan = session_ctx - .state() - .create_physical_plan(&insert_into_table) - .await?; - - // Execute the physical plan and collect the results - let res = collect(plan, session_ctx.task_ctx()).await?; - // Insert returns the number of rows written, in our case this would be 6. - let expected = [ - "+-------+", - "| count |", - "+-------+", - "| 6 |", - "+-------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &res); - - // Read the records in the table - let batches = session_ctx.sql("select * from t").await?.collect().await?; - - // Define the expected result as a vector of strings. - let expected = [ - "+---------+", - "| column1 |", - "+---------+", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "+---------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &batches); - - // Assert that only 1 file was added to the table - let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 1); - - // Create a physical plan from the insert plan - let plan = session_ctx - .state() - .create_physical_plan(&insert_into_table) - .await?; - - // Again, execute the physical plan and collect the results - let res = collect(plan, session_ctx.task_ctx()).await?; - // Insert returns the number of rows written, in our case this would be 6. - let expected = [ - "+-------+", - "| count |", - "+-------+", - "| 6 |", - "+-------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &res); - - // Open the CSV file, read its contents as a record batch, and collect the batches into a vector. - let batches = session_ctx.sql("select * from t").await?.collect().await?; - - // Define the expected result after the second append. - let expected = vec![ - "+---------+", - "| column1 |", - "+---------+", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "+---------+", - ]; - - // Assert that the batches read from the file after the second append match the expected result. - assert_batches_eq!(expected, &batches); - - // Assert that no additional files were added to the table - let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 1); - - // Return Ok if the function - Ok(()) - } - async fn helper_test_append_new_files_to_table( file_type: FileType, file_compression_type: FileCompressionType, @@ -2129,7 +1831,6 @@ mod tests { "t", tmp_dir.path().to_str().unwrap(), CsvReadOptions::new() - .insert_mode(ListingTableInsertMode::AppendNewFiles) .schema(schema.as_ref()) .file_compression_type(file_compression_type), ) @@ -2141,7 +1842,6 @@ mod tests { "t", tmp_dir.path().to_str().unwrap(), NdJsonReadOptions::default() - .insert_mode(ListingTableInsertMode::AppendNewFiles) .schema(schema.as_ref()) .file_compression_type(file_compression_type), ) @@ -2152,9 +1852,7 @@ mod tests { .register_parquet( "t", tmp_dir.path().to_str().unwrap(), - ParquetReadOptions::default() - .insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + ParquetReadOptions::default().schema(schema.as_ref()), ) .await?; } @@ -2163,10 +1861,7 @@ mod tests { .register_avro( "t", tmp_dir.path().to_str().unwrap(), - AvroReadOptions::default() - // TODO implement insert_mode for avro - //.insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + AvroReadOptions::default().schema(schema.as_ref()), ) .await?; } @@ -2175,10 +1870,7 @@ mod tests { .register_arrow( "t", tmp_dir.path().to_str().unwrap(), - ArrowReadOptions::default() - // TODO implement insert_mode for arrow - //.insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + ArrowReadOptions::default().schema(schema.as_ref()), ) .await?; } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index f9a7ab04ce680..238113c86cdcb 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -21,8 +21,6 @@ use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use super::listing::ListingTableInsertMode; - #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::{ @@ -149,19 +147,6 @@ impl TableProviderFactory for ListingTableFactory { .take_bool_option("single_file")? .unwrap_or(false); - let explicit_insert_mode = statement_options.take_str_option("insert_mode"); - let insert_mode = match explicit_insert_mode { - Some(mode) => ListingTableInsertMode::from_str(mode.as_str()), - None => match file_type { - FileType::CSV => Ok(ListingTableInsertMode::AppendToFile), - #[cfg(feature = "parquet")] - FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles), - FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles), - FileType::JSON => Ok(ListingTableInsertMode::AppendToFile), - FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles), - }, - }?; - let file_type = file_format.file_type(); // Use remaining options and session state to build FileTypeWriterOptions @@ -214,7 +199,6 @@ impl TableProviderFactory for ListingTableFactory { .with_target_partitions(state.config().target_partitions()) .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()) - .with_insert_mode(insert_mode) .with_single_file(single_file) .with_write_options(file_type_writer_options) .with_infinite_source(unbounded); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ea0a9698ff5ca..4e85b31e6806c 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -50,7 +50,6 @@ use std::{ use super::listing::ListingTableUrl; use crate::error::{DataFusionError, Result}; use crate::{ - datasource::file_format::write::FileWriterMode, physical_plan::{DisplayAs, DisplayFormatType}, }; use crate::{ @@ -90,8 +89,6 @@ pub struct FileSinkConfig { /// A vector of column names and their corresponding data types, /// representing the partitioning columns for the file pub table_partition_cols: Vec<(String, DataType)>, - /// A writer mode that determines how data is written to the file - pub writer_mode: FileWriterMode, /// If true, it is assumed there is a single table_path which is a file to which all data should be written /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index fffc51abeb678..4f146cb1be549 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -27,7 +27,6 @@ use crate::datasource::file_format::csv::CsvFormat; use crate::datasource::file_format::json::JsonFormat; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; -use crate::datasource::file_format::write::FileWriterMode; use crate::datasource::file_format::FileFormat; use crate::datasource::listing::ListingTableUrl; use crate::datasource::physical_plan::FileSinkConfig; @@ -591,7 +590,6 @@ impl DefaultPhysicalPlanner { output_schema: Arc::new(schema), table_partition_cols: vec![], unbounded_input: false, - writer_mode: FileWriterMode::PutMultipart, single_file_output: *single_file_output, overwrite: false, file_type_writer_options diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 93c7f7368065c..67841ffec07ed 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -323,6 +323,7 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] + #[ignore] async fn test_sql_insert_into_fifo() -> Result<()> { // To make unbounded deterministic let waiting = Arc::new(AtomicBool::new(true)); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index fa080518d50cb..7bd9b7112d167 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1153,12 +1153,6 @@ message PhysicalPlanNode { } } -enum FileWriterMode { - APPEND = 0; - PUT = 1; - PUT_MULTIPART = 2; -} - enum CompressionTypeVariant { GZIP = 0; BZIP2 = 1; @@ -1183,12 +1177,13 @@ message JsonWriterOptions { } message FileSinkConfig { + reserved 6; // writer_mode + string object_store_url = 1; repeated PartitionedFile file_groups = 2; repeated string table_paths = 3; Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; - FileWriterMode writer_mode = 6; bool single_file_output = 7; bool unbounded_input = 8; bool overwrite = 9; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 08e7413102e8d..ed670cc366803 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -7471,9 +7471,6 @@ impl serde::Serialize for FileSinkConfig { if !self.table_partition_cols.is_empty() { len += 1; } - if self.writer_mode != 0 { - len += 1; - } if self.single_file_output { len += 1; } @@ -7502,11 +7499,6 @@ impl serde::Serialize for FileSinkConfig { if !self.table_partition_cols.is_empty() { struct_ser.serialize_field("tablePartitionCols", &self.table_partition_cols)?; } - if self.writer_mode != 0 { - let v = FileWriterMode::try_from(self.writer_mode) - .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.writer_mode)))?; - struct_ser.serialize_field("writerMode", &v)?; - } if self.single_file_output { struct_ser.serialize_field("singleFileOutput", &self.single_file_output)?; } @@ -7539,8 +7531,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "outputSchema", "table_partition_cols", "tablePartitionCols", - "writer_mode", - "writerMode", "single_file_output", "singleFileOutput", "unbounded_input", @@ -7557,7 +7547,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { TablePaths, OutputSchema, TablePartitionCols, - WriterMode, SingleFileOutput, UnboundedInput, Overwrite, @@ -7588,7 +7577,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "tablePaths" | "table_paths" => Ok(GeneratedField::TablePaths), "outputSchema" | "output_schema" => Ok(GeneratedField::OutputSchema), "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), - "writerMode" | "writer_mode" => Ok(GeneratedField::WriterMode), "singleFileOutput" | "single_file_output" => Ok(GeneratedField::SingleFileOutput), "unboundedInput" | "unbounded_input" => Ok(GeneratedField::UnboundedInput), "overwrite" => Ok(GeneratedField::Overwrite), @@ -7617,7 +7605,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { let mut table_paths__ = None; let mut output_schema__ = None; let mut table_partition_cols__ = None; - let mut writer_mode__ = None; let mut single_file_output__ = None; let mut unbounded_input__ = None; let mut overwrite__ = None; @@ -7654,12 +7641,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { } table_partition_cols__ = Some(map_.next_value()?); } - GeneratedField::WriterMode => { - if writer_mode__.is_some() { - return Err(serde::de::Error::duplicate_field("writerMode")); - } - writer_mode__ = Some(map_.next_value::()? as i32); - } GeneratedField::SingleFileOutput => { if single_file_output__.is_some() { return Err(serde::de::Error::duplicate_field("singleFileOutput")); @@ -7692,7 +7673,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { table_paths: table_paths__.unwrap_or_default(), output_schema: output_schema__, table_partition_cols: table_partition_cols__.unwrap_or_default(), - writer_mode: writer_mode__.unwrap_or_default(), single_file_output: single_file_output__.unwrap_or_default(), unbounded_input: unbounded_input__.unwrap_or_default(), overwrite: overwrite__.unwrap_or_default(), @@ -7800,80 +7780,6 @@ impl<'de> serde::Deserialize<'de> for FileTypeWriterOptions { deserializer.deserialize_struct("datafusion.FileTypeWriterOptions", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for FileWriterMode { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - let variant = match self { - Self::Append => "APPEND", - Self::Put => "PUT", - Self::PutMultipart => "PUT_MULTIPART", - }; - serializer.serialize_str(variant) - } -} -impl<'de> serde::Deserialize<'de> for FileWriterMode { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "APPEND", - "PUT", - "PUT_MULTIPART", - ]; - - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = FileWriterMode; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - fn visit_i64(self, v: i64) -> std::result::Result - where - E: serde::de::Error, - { - i32::try_from(v) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_else(|| { - serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) - }) - } - - fn visit_u64(self, v: u64) -> std::result::Result - where - E: serde::de::Error, - { - i32::try_from(v) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_else(|| { - serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) - }) - } - - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "APPEND" => Ok(FileWriterMode::Append), - "PUT" => Ok(FileWriterMode::Put), - "PUT_MULTIPART" => Ok(FileWriterMode::PutMultipart), - _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), - } - } - } - deserializer.deserialize_any(GeneratedVisitor) - } -} impl serde::Serialize for FilterExecNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 15606488b33a7..79b0658f24c02 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1613,8 +1613,6 @@ pub struct FileSinkConfig { pub output_schema: ::core::option::Option, #[prost(message, repeated, tag = "5")] pub table_partition_cols: ::prost::alloc::vec::Vec, - #[prost(enumeration = "FileWriterMode", tag = "6")] - pub writer_mode: i32, #[prost(bool, tag = "7")] pub single_file_output: bool, #[prost(bool, tag = "8")] @@ -3189,35 +3187,6 @@ impl UnionMode { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] -pub enum FileWriterMode { - Append = 0, - Put = 1, - PutMultipart = 2, -} -impl FileWriterMode { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - FileWriterMode::Append => "APPEND", - FileWriterMode::Put => "PUT", - FileWriterMode::PutMultipart => "PUT_MULTIPART", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "APPEND" => Some(Self::Append), - "PUT" => Some(Self::Put), - "PUT_MULTIPART" => Some(Self::PutMultipart), - _ => None, - } - } -} -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] pub enum CompressionTypeVariant { Gzip = 0, Bzip2 = 1, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 22b74db9afd27..f5771ddb155b4 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use arrow::compute::SortOptions; use datafusion::arrow::datatypes::Schema; use datafusion::datasource::file_format::json::JsonSink; -use datafusion::datasource::file_format::write::FileWriterMode; use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; @@ -739,7 +738,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { table_paths, output_schema: Arc::new(convert_required!(conf.output_schema)?), table_partition_cols, - writer_mode: conf.writer_mode().into(), single_file_output: conf.single_file_output, unbounded_input: conf.unbounded_input, overwrite: conf.overwrite, @@ -748,16 +746,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { } } -impl From for FileWriterMode { - fn from(value: protobuf::FileWriterMode) -> Self { - match value { - protobuf::FileWriterMode::Append => Self::Append, - protobuf::FileWriterMode::Put => Self::Put, - protobuf::FileWriterMode::PutMultipart => Self::PutMultipart, - } - } -} - impl From for CompressionTypeVariant { fn from(value: protobuf::CompressionTypeVariant) -> Self { match value { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index b8a590b0dc1a1..44864be947d59 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -31,7 +31,6 @@ use datafusion::datasource::{ file_format::json::JsonSink, physical_plan::FileScanConfig, }; use datafusion::datasource::{ - file_format::write::FileWriterMode, listing::{FileRange, PartitionedFile}, physical_plan::FileSinkConfig, }; @@ -819,7 +818,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { type Error = DataFusionError; fn try_from(conf: &FileSinkConfig) -> Result { - let writer_mode: protobuf::FileWriterMode = conf.writer_mode.into(); let file_groups = conf .file_groups .iter() @@ -847,7 +845,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { table_paths, output_schema: Some(conf.output_schema.as_ref().try_into()?), table_partition_cols, - writer_mode: writer_mode.into(), single_file_output: conf.single_file_output, unbounded_input: conf.unbounded_input, overwrite: conf.overwrite, @@ -856,16 +853,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { } } -impl From for protobuf::FileWriterMode { - fn from(value: FileWriterMode) -> Self { - match value { - FileWriterMode::Append => Self::Append, - FileWriterMode::Put => Self::Put, - FileWriterMode::PutMultipart => Self::PutMultipart, - } - } -} - impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant { fn from(value: &CompressionTypeVariant) -> Self { match value { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 076ca415810a2..23b0ea43c73ab 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -22,7 +22,6 @@ use datafusion::arrow::array::ArrayRef; use datafusion::arrow::compute::kernels::sort::SortOptions; use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema}; use datafusion::datasource::file_format::json::JsonSink; -use datafusion::datasource::file_format::write::FileWriterMode; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ @@ -732,7 +731,6 @@ fn roundtrip_json_sink() -> Result<()> { table_paths: vec![ListingTableUrl::parse("file:///")?], output_schema: schema.clone(), table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], - writer_mode: FileWriterMode::Put, single_file_output: true, unbounded_input: false, overwrite: true,