Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update arrow 49.0.0 and object_store 0.8.0 #8029

Merged
merged 11 commits into from
Nov 18, 2023
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,6 @@ overflow-checks = false
panic = 'unwind'
rpath = false


[patch.crates-io]
object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "ab53d2dd5fb2af2bab69f95a6ddae2226c166500" }
9 changes: 4 additions & 5 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ assert_cmd = "2.0"
ctor = "0.2.0"
predicates = "3.0"
rstest = "0.17"

[patch.crates-io]
object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "ab53d2dd5fb2af2bab69f95a6ddae2226c166500" }
7 changes: 1 addition & 6 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,7 @@ impl ListingSchemaProvider {

/// Reload table information from ObjectStore
pub async fn refresh(&self, state: &SessionState) -> datafusion_common::Result<()> {
let entries: Vec<_> = self
.store
.list(Some(&self.path))
.await?
.try_collect()
.await?;
let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is certainly nicer

let base = Path::new(self.path.as_ref());
let mut tables = HashSet::new();
for file in entries.iter() {
Expand Down
75 changes: 6 additions & 69 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
use super::write::orchestration::stateless_multipart_put;
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
Expand Down Expand Up @@ -465,11 +465,7 @@ impl DisplayAs for CsvSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"CsvSink(writer_mode={:?}, file_groups=",
self.config.writer_mode
)?;
write!(f, "CsvSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
Expand All @@ -481,55 +477,6 @@ impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
Self { config }
}

async fn append_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
let compression = FileCompressionType::from(*compression);

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let file_groups = &self.config.file_groups;

let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move |file_size| {
let inner_clone = builder_clone.clone();
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 0 {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(false)
} else {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(options_clone.writer_options.header())
});
serializer
};

stateless_append_all(
data,
context,
object_store,
file_groups,
self.config.unbounded_input,
compression,
Box::new(get_serializer),
)
.await
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down Expand Up @@ -577,19 +524,8 @@ impl DataSink for CsvSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
match self.config.writer_mode {
FileWriterMode::Append => {
let total_count = self.append_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::PutMultipart => {
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::Put => {
return not_impl_err!("FileWriterMode::Put is not supported yet!")
}
}
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
}

Expand Down Expand Up @@ -737,6 +673,7 @@ mod tests {
last_modified: DateTime::default(),
size: usize::MAX,
e_tag: None,
version: None,
};

let num_rows_to_read = 100;
Expand Down
59 changes: 5 additions & 54 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
use super::write::orchestration::stateless_multipart_put;

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
Expand Down Expand Up @@ -245,11 +245,7 @@ impl DisplayAs for JsonSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"JsonSink(writer_mode={:?}, file_groups=",
self.config.writer_mode
)?;
write!(f, "JsonSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
Expand All @@ -262,40 +258,6 @@ impl JsonSink {
Self { config }
}

async fn append_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}

let writer_options = self.config.file_type_writer_options.try_into_json()?;
let compression = &writer_options.compression;

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let file_groups = &self.config.file_groups;

let get_serializer = move |_| {
let serializer: Box<dyn BatchSerializer> = Box::new(JsonSerializer::new());
serializer
};

stateless_append_all(
data,
context,
object_store,
file_groups,
self.config.unbounded_input,
(*compression).into(),
Box::new(get_serializer),
)
.await
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down Expand Up @@ -336,19 +298,8 @@ impl DataSink for JsonSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
match self.config.writer_mode {
FileWriterMode::Append => {
let total_count = self.append_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::PutMultipart => {
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::Put => {
return not_impl_err!("FileWriterMode::Put is not supported yet!")
}
}
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
}

Expand Down
16 changes: 11 additions & 5 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ pub(crate) mod test_util {
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, PutOptions,
PutResult,
};
use tokio::io::AsyncWrite;

Expand Down Expand Up @@ -189,7 +190,12 @@ pub(crate) mod test_util {

#[async_trait]
impl ObjectStore for VariableStream {
async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
async fn put_opts(
&self,
_location: &Path,
_bytes: Bytes,
_opts: PutOptions,
) -> object_store::Result<PutResult> {
unimplemented!()
}

Expand Down Expand Up @@ -228,6 +234,7 @@ pub(crate) mod test_util {
last_modified: Default::default(),
size: range.end,
e_tag: None,
version: None,
},
range: Default::default(),
})
Expand Down Expand Up @@ -257,11 +264,10 @@ pub(crate) mod test_util {
unimplemented!()
}

async fn list(
fn list(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
{
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
unimplemented!()
}

Expand Down
Loading
Loading