Skip to content

Commit

Permalink
Remove FileWriterMode Support (#7994)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Nov 15, 2023
1 parent 020b8fc commit 1e86301
Show file tree
Hide file tree
Showing 30 changed files with 96 additions and 1,028 deletions.
74 changes: 5 additions & 69 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
use super::write::orchestration::stateless_multipart_put;
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
Expand Down Expand Up @@ -465,11 +465,7 @@ impl DisplayAs for CsvSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"CsvSink(writer_mode={:?}, file_groups=",
self.config.writer_mode
)?;
write!(f, "CsvSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
Expand All @@ -481,55 +477,6 @@ impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
Self { config }
}

async fn append_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
let compression = FileCompressionType::from(*compression);

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let file_groups = &self.config.file_groups;

let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move |file_size| {
let inner_clone = builder_clone.clone();
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 0 {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(false)
} else {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(options_clone.writer_options.header())
});
serializer
};

stateless_append_all(
data,
context,
object_store,
file_groups,
self.config.unbounded_input,
compression,
Box::new(get_serializer),
)
.await
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down Expand Up @@ -577,19 +524,8 @@ impl DataSink for CsvSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
match self.config.writer_mode {
FileWriterMode::Append => {
let total_count = self.append_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::PutMultipart => {
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::Put => {
return not_impl_err!("FileWriterMode::Put is not supported yet!")
}
}
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
}

Expand Down
59 changes: 5 additions & 54 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
use super::write::orchestration::stateless_multipart_put;

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
Expand Down Expand Up @@ -245,11 +245,7 @@ impl DisplayAs for JsonSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"JsonSink(writer_mode={:?}, file_groups=",
self.config.writer_mode
)?;
write!(f, "JsonSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
Expand All @@ -268,40 +264,6 @@ impl JsonSink {
&self.config
}

async fn append_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}

let writer_options = self.config.file_type_writer_options.try_into_json()?;
let compression = &writer_options.compression;

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let file_groups = &self.config.file_groups;

let get_serializer = move |_| {
let serializer: Box<dyn BatchSerializer> = Box::new(JsonSerializer::new());
serializer
};

stateless_append_all(
data,
context,
object_store,
file_groups,
self.config.unbounded_input,
(*compression).into(),
Box::new(get_serializer),
)
.await
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down Expand Up @@ -342,19 +304,8 @@ impl DataSink for JsonSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
match self.config.writer_mode {
FileWriterMode::Append => {
let total_count = self.append_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::PutMultipart => {
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::Put => {
return not_impl_err!("FileWriterMode::Put is not supported yet!")
}
}
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
}

Expand Down
32 changes: 1 addition & 31 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::{
file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
listing::ListingOptions,
Expand Down Expand Up @@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> {
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
/// Setting controls how inserts to this file should be handled
pub insert_mode: ListingTableInsertMode,
}

impl<'a> Default for CsvReadOptions<'a> {
Expand All @@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
insert_mode: ListingTableInsertMode::AppendToFile,
}
}

Expand Down Expand Up @@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> {
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
/// Setting controls how inserts to this file should be handled
pub insert_mode: ListingTableInsertMode,
}

impl<'a> Default for ParquetReadOptions<'a> {
Expand All @@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> {
skip_metadata: None,
schema: None,
file_sort_order: vec![],
insert_mode: ListingTableInsertMode::AppendNewFiles,
}
}
}
Expand Down Expand Up @@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
}

/// Options that control the reading of ARROW files.
Expand Down Expand Up @@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> {
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
/// Setting controls how inserts to this file should be handled
pub insert_mode: ListingTableInsertMode,
}

impl<'a> Default for NdJsonReadOptions<'a> {
Expand All @@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
insert_mode: ListingTableInsertMode::AppendToFile,
}
}
}
Expand Down Expand Up @@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
}

#[async_trait]
Expand Down Expand Up @@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_infinite_source(self.infinite)
.with_insert_mode(self.insert_mode.clone())
}

async fn get_resolved_schema(
Expand All @@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_insert_mode(self.insert_mode.clone())
}

async fn get_resolved_schema(
Expand All @@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
.with_table_partition_cols(self.table_partition_cols.clone())
.with_infinite_source(self.infinite)
.with_file_sort_order(self.file_sort_order.clone())
.with_insert_mode(self.insert_mode.clone())
}

async fn get_resolved_schema(
Expand Down
Loading

0 comments on commit 1e86301

Please sign in to comment.