Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove FileWriterMode and ListingTableInsertMode (#7994) #8017

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 5 additions & 69 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
use super::write::orchestration::stateless_multipart_put;
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
Expand Down Expand Up @@ -465,11 +465,7 @@ impl DisplayAs for CsvSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"CsvSink(writer_mode={:?}, file_groups=",
self.config.writer_mode
)?;
write!(f, "CsvSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
Expand All @@ -481,55 +477,6 @@ impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
Self { config }
}

async fn append_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
let compression = FileCompressionType::from(*compression);

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let file_groups = &self.config.file_groups;

let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move |file_size| {
let inner_clone = builder_clone.clone();
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 0 {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(false)
} else {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(options_clone.writer_options.header())
});
serializer
};

stateless_append_all(
data,
context,
object_store,
file_groups,
self.config.unbounded_input,
compression,
Box::new(get_serializer),
)
.await
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down Expand Up @@ -577,19 +524,8 @@ impl DataSink for CsvSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
match self.config.writer_mode {
FileWriterMode::Append => {
let total_count = self.append_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::PutMultipart => {
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::Put => {
return not_impl_err!("FileWriterMode::Put is not supported yet!")
}
}
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
}

Expand Down
59 changes: 5 additions & 54 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
use super::write::orchestration::stateless_multipart_put;

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
Expand Down Expand Up @@ -245,11 +245,7 @@ impl DisplayAs for JsonSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"JsonSink(writer_mode={:?}, file_groups=",
self.config.writer_mode
)?;
write!(f, "JsonSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
Expand All @@ -268,40 +264,6 @@ impl JsonSink {
&self.config
}

async fn append_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}

let writer_options = self.config.file_type_writer_options.try_into_json()?;
let compression = &writer_options.compression;

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let file_groups = &self.config.file_groups;

let get_serializer = move |_| {
let serializer: Box<dyn BatchSerializer> = Box::new(JsonSerializer::new());
serializer
};

stateless_append_all(
data,
context,
object_store,
file_groups,
self.config.unbounded_input,
(*compression).into(),
Box::new(get_serializer),
)
.await
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down Expand Up @@ -342,19 +304,8 @@ impl DataSink for JsonSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
match self.config.writer_mode {
FileWriterMode::Append => {
let total_count = self.append_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::PutMultipart => {
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::Put => {
return not_impl_err!("FileWriterMode::Put is not supported yet!")
}
}
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
}

Expand Down
32 changes: 1 addition & 31 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::{
file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
listing::ListingOptions,
Expand Down Expand Up @@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> {
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
/// Setting controls how inserts to this file should be handled
pub insert_mode: ListingTableInsertMode,
}

impl<'a> Default for CsvReadOptions<'a> {
Expand All @@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
insert_mode: ListingTableInsertMode::AppendToFile,
}
}

Expand Down Expand Up @@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> {
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
/// Setting controls how inserts to this file should be handled
pub insert_mode: ListingTableInsertMode,
}

impl<'a> Default for ParquetReadOptions<'a> {
Expand All @@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> {
skip_metadata: None,
schema: None,
file_sort_order: vec![],
insert_mode: ListingTableInsertMode::AppendNewFiles,
}
}
}
Expand Down Expand Up @@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
}

/// Options that control the reading of ARROW files.
Expand Down Expand Up @@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> {
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
/// Setting controls how inserts to this file should be handled
pub insert_mode: ListingTableInsertMode,
}

impl<'a> Default for NdJsonReadOptions<'a> {
Expand All @@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
insert_mode: ListingTableInsertMode::AppendToFile,
}
}
}
Expand Down Expand Up @@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure how insertions to this table should be handled
pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
self.insert_mode = insert_mode;
self
}
}

#[async_trait]
Expand Down Expand Up @@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_infinite_source(self.infinite)
.with_insert_mode(self.insert_mode.clone())
}

async fn get_resolved_schema(
Expand All @@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_insert_mode(self.insert_mode.clone())
}

async fn get_resolved_schema(
Expand All @@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
.with_table_partition_cols(self.table_partition_cols.clone())
.with_infinite_source(self.infinite)
.with_file_sort_order(self.file_sort_order.clone())
.with_insert_mode(self.insert_mode.clone())
}

async fn get_resolved_schema(
Expand Down
Loading