From 1d7e8bdcd3934d005eb263840b2b584a312fcf2d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 1 Nov 2023 07:49:33 +0000 Subject: [PATCH 1/8] POC: Remove ListingTable Append Support (#7994) --- .../core/src/datasource/file_format/csv.rs | 74 +--- .../core/src/datasource/file_format/json.rs | 59 +--- .../src/datasource/file_format/options.rs | 32 +- .../src/datasource/file_format/parquet.rs | 67 ++-- .../src/datasource/file_format/write/mod.rs | 106 ++---- .../file_format/write/orchestration.rs | 111 +----- datafusion/core/src/datasource/listing/mod.rs | 4 +- .../core/src/datasource/listing/table.rs | 322 +----------------- .../src/datasource/listing_table_factory.rs | 16 - .../core/src/datasource/physical_plan/mod.rs | 3 - datafusion/core/src/physical_planner.rs | 2 - datafusion/core/tests/fifo.rs | 1 + datafusion/sqllogictest/test_files/copy.slt | 20 +- .../sqllogictest/test_files/explain.slt | 2 +- .../test_files/insert_to_external.slt | 56 +-- .../sqllogictest/test_files/options.slt | 4 +- datafusion/sqllogictest/test_files/order.slt | 2 +- .../sqllogictest/test_files/predicates.slt | 1 + .../sqllogictest/test_files/set_variable.slt | 2 +- datafusion/sqllogictest/test_files/update.slt | 2 +- 20 files changed, 94 insertions(+), 792 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 5f2084bc80a8..684f416f771a 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -34,10 +34,10 @@ use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; -use super::write::orchestration::{stateless_append_all, stateless_multipart_put}; +use super::write::orchestration::stateless_multipart_put; use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode}; +use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, }; @@ -465,11 +465,7 @@ impl DisplayAs for CsvSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "CsvSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "CsvSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -481,55 +477,6 @@ impl CsvSink { fn new(config: FileSinkConfig) -> Self { Self { config } } - - async fn append_all( - &self, - data: SendableRecordBatchStream, - context: &Arc, - ) -> Result { - if !self.config.table_partition_cols.is_empty() { - return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into())); - } - let writer_options = self.config.file_type_writer_options.try_into_csv()?; - let (builder, compression) = - (&writer_options.writer_options, &writer_options.compression); - let compression = FileCompressionType::from(*compression); - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - let file_groups = &self.config.file_groups; - - let builder_clone = builder.clone(); - let options_clone = writer_options.clone(); - let get_serializer = move |file_size| { - let inner_clone = builder_clone.clone(); - // In append mode, consider has_header flag only when file is empty (at the start). - // For other modes, use has_header flag as is. - let serializer: Box = Box::new(if file_size > 0 { - CsvSerializer::new() - .with_builder(inner_clone) - .with_header(false) - } else { - CsvSerializer::new() - .with_builder(inner_clone) - .with_header(options_clone.writer_options.header()) - }); - serializer - }; - - stateless_append_all( - data, - context, - object_store, - file_groups, - self.config.unbounded_input, - compression, - Box::new(get_serializer), - ) - .await - } - async fn multipartput_all( &self, data: SendableRecordBatchStream, @@ -577,19 +524,8 @@ impl DataSink for CsvSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - match self.config.writer_mode { - FileWriterMode::Append => { - let total_count = self.append_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::PutMultipart => { - let total_count = self.multipartput_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::Put => { - return not_impl_err!("FileWriterMode::Put is not supported yet!") - } - } + let total_count = self.multipartput_all(data, context).await?; + Ok(total_count) } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 70cfd1836efe..1d56dc7268bc 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; -use super::write::orchestration::{stateless_append_all, stateless_multipart_put}; +use super::write::orchestration::stateless_multipart_put; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode}; +use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec}; use crate::error::Result; @@ -245,11 +245,7 @@ impl DisplayAs for JsonSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "JsonSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "JsonSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -262,40 +258,6 @@ impl JsonSink { Self { config } } - async fn append_all( - &self, - data: SendableRecordBatchStream, - context: &Arc, - ) -> Result { - if !self.config.table_partition_cols.is_empty() { - return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into())); - } - - let writer_options = self.config.file_type_writer_options.try_into_json()?; - let compression = &writer_options.compression; - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - let file_groups = &self.config.file_groups; - - let get_serializer = move |_| { - let serializer: Box = Box::new(JsonSerializer::new()); - serializer - }; - - stateless_append_all( - data, - context, - object_store, - file_groups, - self.config.unbounded_input, - (*compression).into(), - Box::new(get_serializer), - ) - .await - } - async fn multipartput_all( &self, data: SendableRecordBatchStream, @@ -336,19 +298,8 @@ impl DataSink for JsonSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - match self.config.writer_mode { - FileWriterMode::Append => { - let total_count = self.append_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::PutMultipart => { - let total_count = self.multipartput_all(data, context).await?; - Ok(total_count) - } - FileWriterMode::Put => { - return not_impl_err!("FileWriterMode::Put is not supported yet!") - } - } + let total_count = self.multipartput_all(data, context).await?; + Ok(total_count) } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 41a70e6d2f8f..4c7557a4a9c0 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -28,7 +28,7 @@ use crate::datasource::file_format::file_compression_type::FileCompressionType; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; -use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl}; +use crate::datasource::listing::ListingTableUrl; use crate::datasource::{ file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat}, listing::ListingOptions, @@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for CsvReadOptions<'a> { @@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendToFile, } } @@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } /// Options that control the reading of Parquet files. @@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> { pub schema: Option<&'a Schema>, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for ParquetReadOptions<'a> { @@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> { skip_metadata: None, schema: None, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendNewFiles, } } } @@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } /// Options that control the reading of ARROW files. @@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Setting controls how inserts to this file should be handled - pub insert_mode: ListingTableInsertMode, } impl<'a> Default for NdJsonReadOptions<'a> { @@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], - insert_mode: ListingTableInsertMode::AppendToFile, } } } @@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> { self.file_sort_order = file_sort_order; self } - - /// Configure how insertions to this table should be handled - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } } #[async_trait] @@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) .with_infinite_source(self.infinite) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( @@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( @@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { .with_table_partition_cols(self.table_partition_cols.clone()) .with_infinite_source(self.infinite) .with_file_sort_order(self.file_sort_order.clone()) - .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 2cba474e559e..c4d05adfc6bc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -40,11 +40,12 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use bytes::{BufMut, BytesMut}; -use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; +use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; +use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, @@ -55,7 +56,7 @@ use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use super::write::demux::start_demuxer_task; -use super::write::{create_writer, AbortableWrite, FileWriterMode}; +use super::write::{create_writer, AbortableWrite}; use super::{FileFormat, FileScanConfig}; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, @@ -64,7 +65,7 @@ use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; use crate::datasource::physical_plan::{ - FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter, + FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter, }; use crate::error::Result; use crate::execution::context::SessionState; @@ -596,11 +597,7 @@ impl DisplayAs for ParquetSink { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "ParquetSink(writer_mode={:?}, file_groups=", - self.config.writer_mode - )?; + write!(f, "ParquetSink(file_groups=",)?; FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; write!(f, ")") } @@ -642,36 +639,23 @@ impl ParquetSink { /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized async fn create_async_arrow_writer( &self, - file_meta: FileMeta, + location: &Path, object_store: Arc, parquet_props: WriterProperties, ) -> Result< AsyncArrowWriter>, > { - let object = &file_meta.object_meta; - match self.config.writer_mode { - FileWriterMode::Append => { - plan_err!( - "Appending to Parquet files is not supported by the file format!" - ) - } - FileWriterMode::Put => { - not_impl_err!("FileWriterMode::Put is not implemented for ParquetSink") - } - FileWriterMode::PutMultipart => { - let (_, multipart_writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AsyncArrowWriter::try_new( - multipart_writer, - self.get_writer_schema(), - 10485760, - Some(parquet_props), - )?; - Ok(writer) - } - } + let (_, multipart_writer) = object_store + .put_multipart(location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AsyncArrowWriter::try_new( + multipart_writer, + self.get_writer_schema(), + 10485760, + Some(parquet_props), + )?; + Ok(writer) } } @@ -730,13 +714,7 @@ impl DataSink for ParquetSink { if !allow_single_file_parallelism { let mut writer = self .create_async_arrow_writer( - ObjectMeta { - location: path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - } - .into(), + &path, object_store.clone(), parquet_props.clone(), ) @@ -752,17 +730,10 @@ impl DataSink for ParquetSink { }); } else { let writer = create_writer( - FileWriterMode::PutMultipart, // Parquet files as a whole are never compressed, since they // manage compressed blocks themselves. FileCompressionType::UNCOMPRESSED, - ObjectMeta { - location: path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - } - .into(), + &path, object_store.clone(), ) .await?; diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index 770c7a49c326..c987f47c0e1c 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -26,12 +26,11 @@ use std::task::{Context, Poll}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::physical_plan::FileMeta; use crate::error::Result; use arrow_array::RecordBatch; -use datafusion_common::{exec_err, DataFusionError}; +use datafusion_common::DataFusionError; use async_trait::async_trait; use bytes::Bytes; @@ -141,6 +140,7 @@ impl AsyncWrite for AsyncPutWriter { } /// Stores data needed during abortion of MultiPart writers +#[derive(Clone)] pub(crate) struct MultiPart { /// A shared reference to the object store store: Arc, @@ -163,45 +163,28 @@ impl MultiPart { } } -pub(crate) enum AbortMode { - Put, - Append, - MultiPart(MultiPart), -} - /// A wrapper struct with abort method and writer pub(crate) struct AbortableWrite { writer: W, - mode: AbortMode, + multipart: MultiPart, } impl AbortableWrite { /// Create a new `AbortableWrite` instance with the given writer, and write mode. - pub(crate) fn new(writer: W, mode: AbortMode) -> Self { - Self { writer, mode } + pub(crate) fn new(writer: W, multipart: MultiPart) -> Self { + Self { writer, multipart } } /// handling of abort for different write modes pub(crate) fn abort_writer(&self) -> Result>> { - match &self.mode { - AbortMode::Put => Ok(async { Ok(()) }.boxed()), - AbortMode::Append => exec_err!("Cannot abort in append mode"), - AbortMode::MultiPart(MultiPart { - store, - multipart_id, - location, - }) => { - let location = location.clone(); - let multipart_id = multipart_id.clone(); - let store = store.clone(); - Ok(Box::pin(async move { - store - .abort_multipart(&location, &multipart_id) - .await - .map_err(DataFusionError::ObjectStore) - })) - } - } + let multi = self.multipart.clone(); + Ok(Box::pin(async move { + multi + .store + .abort_multipart(&multi.location, &multi.multipart_id) + .await + .map_err(DataFusionError::ObjectStore) + })) } } @@ -229,16 +212,6 @@ impl AsyncWrite for AbortableWrite { } } -/// An enum that defines different file writer modes. -#[derive(Debug, Clone, Copy)] -pub enum FileWriterMode { - /// Data is appended to an existing file. - Append, - /// Data is written to a new file. - Put, - /// Data is written to a new file in multiple parts. - PutMultipart, -} /// A trait that defines the methods required for a RecordBatch serializer. #[async_trait] pub trait BatchSerializer: Unpin + Send { @@ -255,51 +228,16 @@ pub trait BatchSerializer: Unpin + Send { /// Returns an [`AbortableWrite`] which writes to the given object store location /// with the specified compression pub(crate) async fn create_writer( - writer_mode: FileWriterMode, file_compression_type: FileCompressionType, - file_meta: FileMeta, + location: &Path, object_store: Arc, ) -> Result>> { - let object = &file_meta.object_meta; - match writer_mode { - // If the mode is append, call the store's append method and return wrapped in - // a boxed trait object. - FileWriterMode::Append => { - let writer = object_store - .append(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Append, - ); - Ok(writer) - } - // If the mode is put, create a new AsyncPut writer and return it wrapped in - // a boxed trait object - FileWriterMode::Put => { - let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Put, - ); - Ok(writer) - } - // If the mode is put multipart, call the store's put_multipart method and - // return the writer wrapped in a boxed trait object. - FileWriterMode::PutMultipart => { - let (multipart_id, writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::MultiPart(MultiPart::new( - object_store, - multipart_id, - object.location.clone(), - )), - )) - } - } + let (multipart_id, writer) = object_store + .put_multipart(location) + .await + .map_err(DataFusionError::ObjectStore)?; + Ok(AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + MultiPart::new(object_store, multipart_id, location.clone()), + )) } diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index f84baa9ac225..2ae6b70ed1c5 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::FileSinkConfig; use crate::error::Result; use crate::physical_plan::SendableRecordBatchStream; @@ -34,17 +33,13 @@ use datafusion_common::DataFusionError; use bytes::Bytes; use datafusion_execution::TaskContext; -use futures::StreamExt; - -use object_store::{ObjectMeta, ObjectStore}; - use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver}; use tokio::task::{JoinHandle, JoinSet}; use tokio::try_join; use super::demux::start_demuxer_task; -use super::{create_writer, AbortableWrite, BatchSerializer, FileWriterMode}; +use super::{create_writer, AbortableWrite, BatchSerializer}; type WriterType = AbortableWrite>; type SerializerType = Box; @@ -274,21 +269,9 @@ pub(crate) async fn stateless_multipart_put( stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) .await }); - while let Some((output_location, rb_stream)) = file_stream_rx.recv().await { + while let Some((location, rb_stream)) = file_stream_rx.recv().await { let serializer = get_serializer(); - let object_meta = ObjectMeta { - location: output_location, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - FileWriterMode::PutMultipart, - compression, - object_meta.into(), - object_store.clone(), - ) - .await?; + let writer = create_writer(compression, &location, object_store.clone()).await?; tx_file_bundle .send((rb_stream, serializer, writer)) @@ -325,91 +308,3 @@ pub(crate) async fn stateless_multipart_put( Ok(total_count) } - -/// Orchestrates append_all for any statelessly serialized file type. Appends to all files provided -/// in a round robin fashion. -pub(crate) async fn stateless_append_all( - mut data: SendableRecordBatchStream, - context: &Arc, - object_store: Arc, - file_groups: &Vec, - unbounded_input: bool, - compression: FileCompressionType, - get_serializer: Box Box + Send>, -) -> Result { - let rb_buffer_size = &context - .session_config() - .options() - .execution - .max_buffered_batches_per_output_file; - - let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(file_groups.len()); - let mut send_channels = vec![]; - for file_group in file_groups { - let serializer = get_serializer(file_group.object_meta.size); - - let file = file_group.clone(); - let writer = create_writer( - FileWriterMode::Append, - compression, - file.object_meta.clone().into(), - object_store.clone(), - ) - .await?; - - let (tx, rx) = tokio::sync::mpsc::channel(rb_buffer_size / 2); - send_channels.push(tx); - tx_file_bundle - .send((rx, serializer, writer)) - .await - .map_err(|_| { - DataFusionError::Internal( - "Writer receive file bundle channel closed unexpectedly!".into(), - ) - })?; - } - - let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel(); - let write_coordinater_task = tokio::spawn(async move { - stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) - .await - }); - - // Append to file groups in round robin - let mut next_file_idx = 0; - while let Some(rb) = data.next().await.transpose()? { - send_channels[next_file_idx].send(rb).await.map_err(|_| { - DataFusionError::Internal( - "Recordbatch file append stream closed unexpectedly!".into(), - ) - })?; - next_file_idx = (next_file_idx + 1) % send_channels.len(); - if unbounded_input { - tokio::task::yield_now().await; - } - } - // Signal to the write coordinater that no more files are coming - drop(tx_file_bundle); - drop(send_channels); - - let total_count = rx_row_cnt.await.map_err(|_| { - DataFusionError::Internal( - "Did not receieve row count from write coordinater".into(), - ) - })?; - - match try_join!(write_coordinater_task) { - Ok(r1) => { - r1.0?; - } - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - - Ok(total_count) -} diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 8b0f021f0277..aa2e20164b5e 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -31,9 +31,7 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; -pub use table::{ - ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode, -}; +pub use table::{ListingOptions, ListingTable, ListingTableConfig}; /// Stream of files get listed from object store pub type PartitionedFileStream = diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d26d417bd8b2..7e5fdf0b3393 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -216,33 +216,6 @@ impl ListingTableConfig { } } -#[derive(Debug, Clone)] -///controls how new data should be inserted to a ListingTable -pub enum ListingTableInsertMode { - ///Data should be appended to an existing file - AppendToFile, - ///Data is appended as new files in existing TablePaths - AppendNewFiles, - ///Throw an error if insert into is attempted on this table - Error, -} - -impl FromStr for ListingTableInsertMode { - type Err = DataFusionError; - fn from_str(s: &str) -> Result { - let s_lower = s.to_lowercase(); - match s_lower.as_str() { - "append_to_file" => Ok(ListingTableInsertMode::AppendToFile), - "append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles), - "error" => Ok(ListingTableInsertMode::Error), - _ => plan_err!( - "Unknown or unsupported insert mode {s}. Supported options are \ - append_to_file, append_new_files, and error." - ), - } - } -} - /// Options for creating a [`ListingTable`] #[derive(Clone, Debug)] pub struct ListingOptions { @@ -281,8 +254,6 @@ pub struct ListingOptions { /// In order to support infinite inputs, DataFusion may adjust query /// plans (e.g. joins) to run the given query in full pipelining mode. pub infinite_source: bool, - /// This setting controls how inserts to this table should be handled - pub insert_mode: ListingTableInsertMode, /// This setting when true indicates that the table is backed by a single file. /// Any inserts to the table may only append to this existing file. pub single_file: bool, @@ -307,7 +278,6 @@ impl ListingOptions { target_partitions: 1, file_sort_order: vec![], infinite_source: false, - insert_mode: ListingTableInsertMode::AppendToFile, single_file: false, file_type_write_options: None, } @@ -478,12 +448,6 @@ impl ListingOptions { self } - /// Configure how insertions to this table should be handled. - pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { - self.insert_mode = insert_mode; - self - } - /// Configure if this table is backed by a sigle file pub fn with_single_file(mut self, single_file: bool) -> Self { self.single_file = single_file; @@ -849,31 +813,6 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; - //if we are writing a single output_partition to a table backed by a single file - //we can append to that file. Otherwise, we can write new files into the directory - //adding new files to the listing table in order to insert to the table. - let input_partitions = input.output_partitioning().partition_count(); - let writer_mode = match self.options.insert_mode { - ListingTableInsertMode::AppendToFile => { - if input_partitions > file_groups.len() { - return plan_err!( - "Cannot append {input_partitions} partitions to {} files!", - file_groups.len() - ); - } - - crate::datasource::file_format::write::FileWriterMode::Append - } - ListingTableInsertMode::AppendNewFiles => { - crate::datasource::file_format::write::FileWriterMode::PutMultipart - } - ListingTableInsertMode::Error => { - return plan_err!( - "Invalid plan attempting write to table with TableWriteMode::Error!" - ); - } - }; - let file_format = self.options().format.as_ref(); let file_type_writer_options = match &self.options().file_type_write_options { @@ -891,7 +830,6 @@ impl TableProvider for ListingTable { file_groups, output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), - writer_mode, // A plan can produce finite number of rows even if it has unbounded sources, like LIMIT // queries. Thus, we can check if the plan is streaming to ensure file sink input is // unbounded. When `unbounded_input` flag is `true` for sink, we occasionally call `yield_now` @@ -906,14 +844,6 @@ impl TableProvider for ListingTable { let unsorted: Vec> = vec![]; let order_requirements = if self.options().file_sort_order != unsorted { - if matches!( - self.options().insert_mode, - ListingTableInsertMode::AppendToFile - ) { - return plan_err!( - "Cannot insert into a sorted ListingTable with mode append!" - ); - } // Multiple sort orders in outer vec are equivalent, so we pass only the first one let ordering = self .try_create_output_ordering()? @@ -1032,7 +962,7 @@ mod tests { use crate::prelude::*; use crate::{ assert_batches_eq, - datasource::file_format::{avro::AvroFormat, file_compression_type::FileTypeExt}, + datasource::file_format::avro::AvroFormat, execution::options::ReadOptions, logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, @@ -1594,17 +1524,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_json_file() -> Result<()> { - helper_test_insert_into_append_to_existing_files( - FileType::JSON, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await?; - Ok(()) - } - #[tokio::test] async fn test_insert_into_append_new_json_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); @@ -1623,17 +1542,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_csv_file() -> Result<()> { - helper_test_insert_into_append_to_existing_files( - FileType::CSV, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await?; - Ok(()) - } - #[tokio::test] async fn test_insert_into_append_new_csv_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); @@ -1693,7 +1601,7 @@ mod tests { helper_test_insert_into_sql( "csv", FileCompressionType::UNCOMPRESSED, - "OPTIONS (insert_mode 'append_new_files')", + "", None, ) .await?; @@ -1705,8 +1613,7 @@ mod tests { helper_test_insert_into_sql( "csv", FileCompressionType::UNCOMPRESSED, - "WITH HEADER ROW \ - OPTIONS (insert_mode 'append_new_files')", + "WITH HEADER ROW", None, ) .await?; @@ -1718,7 +1625,7 @@ mod tests { helper_test_insert_into_sql( "json", FileCompressionType::UNCOMPRESSED, - "OPTIONS (insert_mode 'append_new_files')", + "", None, ) .await?; @@ -1906,211 +1813,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> { - let maybe_err = helper_test_insert_into_append_to_existing_files( - FileType::PARQUET, - FileCompressionType::UNCOMPRESSED, - None, - ) - .await; - let _err = - maybe_err.expect_err("Appending to existing parquet file did not fail!"); - Ok(()) - } - - fn load_empty_schema_table( - schema: SchemaRef, - temp_path: &str, - insert_mode: ListingTableInsertMode, - file_format: Arc, - ) -> Result> { - File::create(temp_path)?; - let table_path = ListingTableUrl::parse(temp_path).unwrap(); - - let listing_options = - ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode); - - let config = ListingTableConfig::new(table_path) - .with_listing_options(listing_options) - .with_schema(schema); - - let table = ListingTable::try_new(config)?; - Ok(Arc::new(table)) - } - - /// Logic of testing inserting into listing table by Appending to existing files - /// is the same for all formats/options which support this. This helper allows - /// passing different options to execute the same test with different settings. - async fn helper_test_insert_into_append_to_existing_files( - file_type: FileType, - file_compression_type: FileCompressionType, - session_config_map: Option>, - ) -> Result<()> { - // Create the initial context, schema, and batch. - let session_ctx = match session_config_map { - Some(cfg) => { - let config = SessionConfig::from_string_hash_map(cfg)?; - SessionContext::new_with_config(config) - } - None => SessionContext::new(), - }; - // Create a new schema with one field called "a" of type Int32 - let schema = Arc::new(Schema::new(vec![Field::new( - "column1", - DataType::Int32, - false, - )])); - - // Create a new batch of data to insert into the table - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], - )?; - - // Filename with extension - let filename = format!( - "path{}", - file_type - .to_owned() - .get_ext_with_compression(file_compression_type) - .unwrap() - ); - - // Create a temporary directory and a CSV file within it. - let tmp_dir = TempDir::new()?; - let path = tmp_dir.path().join(filename); - - let file_format: Arc = match file_type { - FileType::CSV => Arc::new( - CsvFormat::default().with_file_compression_type(file_compression_type), - ), - FileType::JSON => Arc::new( - JsonFormat::default().with_file_compression_type(file_compression_type), - ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), - FileType::AVRO => Arc::new(AvroFormat {}), - FileType::ARROW => Arc::new(ArrowFormat {}), - }; - - let initial_table = load_empty_schema_table( - schema.clone(), - path.to_str().unwrap(), - ListingTableInsertMode::AppendToFile, - file_format, - )?; - session_ctx.register_table("t", initial_table)?; - // Create and register the source table with the provided schema and inserted data - let source_table = Arc::new(MemTable::try_new( - schema.clone(), - vec![vec![batch.clone(), batch.clone()]], - )?); - session_ctx.register_table("source", source_table.clone())?; - // Convert the source table into a provider so that it can be used in a query - let source = provider_as_source(source_table); - // Create a table scan logical plan to read from the source table - let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?; - // Create an insert plan to insert the source data into the initial table - let insert_into_table = - LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; - // Create a physical plan from the insert plan - let plan = session_ctx - .state() - .create_physical_plan(&insert_into_table) - .await?; - - // Execute the physical plan and collect the results - let res = collect(plan, session_ctx.task_ctx()).await?; - // Insert returns the number of rows written, in our case this would be 6. - let expected = [ - "+-------+", - "| count |", - "+-------+", - "| 6 |", - "+-------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &res); - - // Read the records in the table - let batches = session_ctx.sql("select * from t").await?.collect().await?; - - // Define the expected result as a vector of strings. - let expected = [ - "+---------+", - "| column1 |", - "+---------+", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "+---------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &batches); - - // Assert that only 1 file was added to the table - let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 1); - - // Create a physical plan from the insert plan - let plan = session_ctx - .state() - .create_physical_plan(&insert_into_table) - .await?; - - // Again, execute the physical plan and collect the results - let res = collect(plan, session_ctx.task_ctx()).await?; - // Insert returns the number of rows written, in our case this would be 6. - let expected = [ - "+-------+", - "| count |", - "+-------+", - "| 6 |", - "+-------+", - ]; - - // Assert that the batches read from the file match the expected result. - assert_batches_eq!(expected, &res); - - // Open the CSV file, read its contents as a record batch, and collect the batches into a vector. - let batches = session_ctx.sql("select * from t").await?.collect().await?; - - // Define the expected result after the second append. - let expected = vec![ - "+---------+", - "| column1 |", - "+---------+", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| 3 |", - "+---------+", - ]; - - // Assert that the batches read from the file after the second append match the expected result. - assert_batches_eq!(expected, &batches); - - // Assert that no additional files were added to the table - let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 1); - - // Return Ok if the function - Ok(()) - } - async fn helper_test_append_new_files_to_table( file_type: FileType, file_compression_type: FileCompressionType, @@ -2156,7 +1858,6 @@ mod tests { "t", tmp_dir.path().to_str().unwrap(), CsvReadOptions::new() - .insert_mode(ListingTableInsertMode::AppendNewFiles) .schema(schema.as_ref()) .file_compression_type(file_compression_type), ) @@ -2168,7 +1869,6 @@ mod tests { "t", tmp_dir.path().to_str().unwrap(), NdJsonReadOptions::default() - .insert_mode(ListingTableInsertMode::AppendNewFiles) .schema(schema.as_ref()) .file_compression_type(file_compression_type), ) @@ -2179,9 +1879,7 @@ mod tests { .register_parquet( "t", tmp_dir.path().to_str().unwrap(), - ParquetReadOptions::default() - .insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + ParquetReadOptions::default().schema(schema.as_ref()), ) .await?; } @@ -2190,10 +1888,7 @@ mod tests { .register_avro( "t", tmp_dir.path().to_str().unwrap(), - AvroReadOptions::default() - // TODO implement insert_mode for avro - //.insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + AvroReadOptions::default().schema(schema.as_ref()), ) .await?; } @@ -2202,10 +1897,7 @@ mod tests { .register_arrow( "t", tmp_dir.path().to_str().unwrap(), - ArrowReadOptions::default() - // TODO implement insert_mode for arrow - //.insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + ArrowReadOptions::default().schema(schema.as_ref()), ) .await?; } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 26f40518979a..ff7f88da18ab 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -21,8 +21,6 @@ use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use super::listing::ListingTableInsertMode; - #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::{ @@ -154,19 +152,6 @@ impl TableProviderFactory for ListingTableFactory { .take_bool_option("single_file")? .unwrap_or(false); - let explicit_insert_mode = statement_options.take_str_option("insert_mode"); - let insert_mode = match explicit_insert_mode { - Some(mode) => ListingTableInsertMode::from_str(mode.as_str()), - None => match file_type { - FileType::CSV => Ok(ListingTableInsertMode::AppendToFile), - #[cfg(feature = "parquet")] - FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles), - FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles), - FileType::JSON => Ok(ListingTableInsertMode::AppendToFile), - FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles), - }, - }?; - let file_type = file_format.file_type(); // Use remaining options and session state to build FileTypeWriterOptions @@ -219,7 +204,6 @@ impl TableProviderFactory for ListingTableFactory { .with_target_partitions(state.config().target_partitions()) .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()) - .with_insert_mode(insert_mode) .with_single_file(single_file) .with_write_options(file_type_writer_options) .with_infinite_source(unbounded); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ea0a9698ff5c..4e85b31e6806 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -50,7 +50,6 @@ use std::{ use super::listing::ListingTableUrl; use crate::error::{DataFusionError, Result}; use crate::{ - datasource::file_format::write::FileWriterMode, physical_plan::{DisplayAs, DisplayFormatType}, }; use crate::{ @@ -90,8 +89,6 @@ pub struct FileSinkConfig { /// A vector of column names and their corresponding data types, /// representing the partitioning columns for the file pub table_partition_cols: Vec<(String, DataType)>, - /// A writer mode that determines how data is written to the file - pub writer_mode: FileWriterMode, /// If true, it is assumed there is a single table_path which is a file to which all data should be written /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index f941e88f3a36..42e684aaa528 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -27,7 +27,6 @@ use crate::datasource::file_format::csv::CsvFormat; use crate::datasource::file_format::json::JsonFormat; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; -use crate::datasource::file_format::write::FileWriterMode; use crate::datasource::file_format::FileFormat; use crate::datasource::listing::ListingTableUrl; use crate::datasource::physical_plan::FileSinkConfig; @@ -592,7 +591,6 @@ impl DefaultPhysicalPlanner { output_schema: Arc::new(schema), table_partition_cols: vec![], unbounded_input: false, - writer_mode: FileWriterMode::PutMultipart, single_file_output: *single_file_output, overwrite: false, file_type_writer_options diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 7d9ea97f7b5b..0f12ce311c38 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -336,6 +336,7 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] + #[ignore] async fn test_sql_insert_into_fifo() -> Result<()> { // To make unbounded deterministic let waiting = Arc::new(AtomicBool::new(true)); diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index f2fe216ee864..fffe7ca96b78 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -32,7 +32,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_output=false options: (compression 'zstd(10)') --TableScan: source_table projection=[col1, col2] physical_plan -InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +InsertExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Error case @@ -66,8 +66,8 @@ select * from validate_parquet; # Copy parquet with all supported statment overrides query IT -COPY source_table -TO 'test_files/scratch/copy/table_with_options' +COPY source_table +TO 'test_files/scratch/copy/table_with_options' (format parquet, single_file_output false, compression snappy, @@ -206,11 +206,11 @@ select * from validate_single_json; # COPY csv files with all options set query IT -COPY source_table -to 'test_files/scratch/copy/table_csv_with_options' -(format csv, -single_file_output false, -header false, +COPY source_table +to 'test_files/scratch/copy/table_csv_with_options' +(format csv, +single_file_output false, +header false, compression 'uncompressed', datetime_format '%FT%H:%M:%S.%9f', delimiter ';', @@ -220,8 +220,8 @@ null_value 'NULLVAL'); # Validate single csv output statement ok -CREATE EXTERNAL TABLE validate_csv_with_options -STORED AS csv +CREATE EXTERNAL TABLE validate_csv_with_options +STORED AS csv LOCATION 'test_files/scratch/copy/table_csv_with_options'; query T diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 066a31590ccd..a2633e629a91 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] physical_plan -InsertExec: sink=CsvSink(writer_mode=Append, file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]) +InsertExec: sink=CsvSink(file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]) --ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c5@4 as c5, c6@5 as c6, c7@6 as c7, c8@7 as c8, c9@8 as c9, c10@9 as c10, c11@10 as c11, c12@11 as c12, c13@12 as c13] ----SortExec: expr=[c1@0 ASC NULLS LAST] ------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 8b01a14568e7..fb3efeee44b7 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -51,7 +51,7 @@ describe dictionary_encoded_values; column1 Utf8 YES column2 Dictionary(Int32, Utf8) YES -statement ok +statement error DataFusion error: Invalid or Unsupported Configuration: Found unsupported option insert_mode with value append_new_files for Parquet format! CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned( a varchar, b varchar, @@ -64,17 +64,12 @@ create_local_path 'true', insert_mode 'append_new_files', ); -query TT +query error DataFusion error: Error during planning: table 'datafusion\.public\.dictionary_encoded_parquet_partitioned' not found insert into dictionary_encoded_parquet_partitioned select * from dictionary_encoded_values ----- -2 -query TT +query error DataFusion error: Error during planning: table 'datafusion\.public\.dictionary_encoded_parquet_partitioned' not found select * from dictionary_encoded_parquet_partitioned order by (a); ----- -a foo -b bar # test_insert_into @@ -87,10 +82,7 @@ ordered_insert_test(a bigint, b bigint) STORED AS csv LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/' WITH ORDER (a ASC, B DESC) -OPTIONS( -create_local_path 'true', -insert_mode 'append_new_files', -); +OPTIONS(create_local_path 'true'); query TT EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5); @@ -100,7 +92,7 @@ Dml: op=[Insert Into] table=[ordered_insert_test] --Projection: column1 AS a, column2 AS b ----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))... physical_plan -InsertExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[]) +InsertExec: sink=CsvSink(file_groups=[]) --SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] ----ProjectionExec: expr=[column1@0 as a, column2@1 as b] ------ValuesExec @@ -131,10 +123,7 @@ partitioned_insert_test(a string, b string, c bigint) STORED AS csv LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/' PARTITIONED BY (a, b) -OPTIONS( -create_local_path 'true', -insert_mode 'append_new_files', -); +OPTIONS(create_local_path 'true'); #note that partitioned cols are moved to the end so value tuples are (c, a, b) query ITT @@ -156,10 +145,7 @@ statement ok CREATE EXTERNAL TABLE partitioned_insert_test_verify(c bigint) STORED AS csv -LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/a=20/b=100/' -OPTIONS( -insert_mode 'append_new_files', -); +LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/a=20/b=100/'; query I select * from partitioned_insert_test_verify; @@ -173,10 +159,7 @@ partitioned_insert_test_json(a string, b string) STORED AS json LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_json/' PARTITIONED BY (a) -OPTIONS( -create_local_path 'true', -insert_mode 'append_new_files', -); +OPTIONS(create_local_path 'true'); query TT INSERT INTO partitioned_insert_test_json values (1, 2), (3, 4), (5, 6), (1, 2), (3, 4), (5, 6); @@ -191,10 +174,7 @@ statement ok CREATE EXTERNAL TABLE partitioned_insert_test_verify_json(b string) STORED AS json -LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_json/a=2/' -OPTIONS( -insert_mode 'append_new_files', -); +LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_json/a=2/'; query T select * from partitioned_insert_test_verify_json; @@ -208,10 +188,7 @@ partitioned_insert_test_pq(a string, b bigint) STORED AS parquet LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_pq/' PARTITIONED BY (a) -OPTIONS( -create_local_path 'true', -insert_mode 'append_new_files', -); +OPTIONS(create_local_path 'true'); query IT INSERT INTO partitioned_insert_test_pq values (1, 2), (3, 4), (5, 6), (1, 2), (3, 4), (5, 6); @@ -232,10 +209,7 @@ statement ok CREATE EXTERNAL TABLE partitioned_insert_test_verify_pq(b bigint) STORED AS parquet -LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_pq/a=2/' -OPTIONS( -insert_mode 'append_new_files', -); +LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned_pq/a=2/'; query I select * from partitioned_insert_test_verify_pq; @@ -267,8 +241,6 @@ INSERT INTO single_file_test values (4, 5), (6, 7); query II select * from single_file_test; ---- -1 2 -3 4 4 5 6 7 @@ -315,7 +287,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +InsertExec: sink=ParquetSink(file_groups=[]) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -378,7 +350,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +InsertExec: sink=ParquetSink(file_groups=[]) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] @@ -422,7 +394,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) +InsertExec: sink=ParquetSink(file_groups=[]) --ProjectionExec: expr=[c1@0 as c1] ----SortExec: expr=[c1@0 ASC NULLS LAST] ------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true diff --git a/datafusion/sqllogictest/test_files/options.slt b/datafusion/sqllogictest/test_files/options.slt index 83fe85745ef8..9366a9b3b3c8 100644 --- a/datafusion/sqllogictest/test_files/options.slt +++ b/datafusion/sqllogictest/test_files/options.slt @@ -84,7 +84,7 @@ statement ok drop table a # test datafusion.sql_parser.parse_float_as_decimal -# +# # default option value is false query RR select 10000000000000000000.01, -10000000000000000000.01 @@ -209,5 +209,3 @@ select -123456789.0123456789012345678901234567890 # Restore option to default value statement ok set datafusion.sql_parser.parse_float_as_decimal = false; - - diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 8148f1c4c7c9..9c5d1704f42b 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -447,7 +447,7 @@ statement ok drop table multiple_ordered_table; # Create tables having some ordered columns. In the next step, we will expect to observe that scalar -# functions, such as mathematical functions like atan(), ceil(), sqrt(), or date_time functions +# functions, such as mathematical functions like atan(), ceil(), sqrt(), or date_time functions # like date_bin() and date_trunc(), will maintain the order of its argument columns. statement ok CREATE EXTERNAL TABLE csv_with_timestamps ( diff --git a/datafusion/sqllogictest/test_files/predicates.slt b/datafusion/sqllogictest/test_files/predicates.slt index d22b2ff953b7..e992a440d0a2 100644 --- a/datafusion/sqllogictest/test_files/predicates.slt +++ b/datafusion/sqllogictest/test_files/predicates.slt @@ -495,6 +495,7 @@ set datafusion.execution.parquet.bloom_filter_enabled=true; query T SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'foo'; +---- query T SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'test'; diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index 714e1e995e26..440fb2c6ef2b 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -243,4 +243,4 @@ statement ok SET TIME ZONE = 'Asia/Taipei2' statement error Arrow error: Parser error: Invalid timezone "Asia/Taipei2": 'Asia/Taipei2' is not a valid timezone -SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ \ No newline at end of file +SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index cb8c6a4fac28..bb7a03e2a133 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -76,4 +76,4 @@ create table t3(a int, b varchar, c double, d int); # set from mutiple tables, sqlparser only supports from one table query error DataFusion error: SQL error: ParserError\("Expected end of statement, found: ,"\) -explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a; \ No newline at end of file +explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a; From 3033b7c452a9f8ae1834d83159092c85b2243c6f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 2 Nov 2023 10:51:47 +0000 Subject: [PATCH 2/8] Prepare object_store 0.8.0 --- Cargo.toml | 3 + datafusion-cli/Cargo.lock | 9 +- datafusion-cli/Cargo.toml | 3 + datafusion/core/src/catalog/listing_schema.rs | 7 +- .../core/src/datasource/file_format/csv.rs | 1 + .../core/src/datasource/file_format/mod.rs | 16 ++- .../src/datasource/file_format/parquet.rs | 20 ++-- .../src/datasource/file_format/write/mod.rs | 98 +------------------ .../core/src/datasource/listing/helpers.rs | 3 +- datafusion/core/src/datasource/listing/mod.rs | 2 + .../core/src/datasource/listing/table.rs | 18 +--- datafusion/core/src/datasource/listing/url.rs | 8 +- .../core/src/datasource/physical_plan/mod.rs | 5 +- .../src/datasource/physical_plan/parquet.rs | 1 + .../physical_plan/parquet/row_groups.rs | 1 + datafusion/core/src/test/object_store.rs | 1 + datafusion/core/src/test_util/parquet.rs | 1 + .../core/tests/parquet/custom_reader.rs | 1 + datafusion/core/tests/parquet/page_pruning.rs | 1 + .../core/tests/parquet/schema_coercion.rs | 1 + datafusion/core/tests/path_partition.rs | 30 +++--- datafusion/execution/src/cache/cache_unit.rs | 2 + .../proto/src/physical_plan/from_proto.rs | 1 + .../substrait/src/physical_plan/consumer.rs | 1 + 24 files changed, 75 insertions(+), 159 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 22d5f2f64464..b8664dc836f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,3 +76,6 @@ overflow-checks = false panic = 'unwind' rpath = false + +[patch.crates-io] +object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "ab53d2dd5fb2af2bab69f95a6ddae2226c166500" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 268f9e28427a..5a35c14d5395 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -2269,8 +2269,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" +source = "git+https://github.com/apache/arrow-rs.git?rev=ab53d2dd5fb2af2bab69f95a6ddae2226c166500#ab53d2dd5fb2af2bab69f95a6ddae2226c166500" dependencies = [ "async-trait", "base64", @@ -2285,7 +2284,7 @@ dependencies = [ "quick-xml", "rand", "reqwest", - "ring 0.16.20", + "ring 0.17.5", "rustls-pemfile", "serde", "serde_json", @@ -2577,9 +2576,9 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quick-xml" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" dependencies = [ "memchr", "serde", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 7dd9cb8bcb37..983d8e6a444a 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -50,3 +50,6 @@ assert_cmd = "2.0" ctor = "0.2.0" predicates = "3.0" rstest = "0.17" + +[patch.crates-io] +object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "ab53d2dd5fb2af2bab69f95a6ddae2226c166500" } diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs index 7e527642be16..0d5c49f377d0 100644 --- a/datafusion/core/src/catalog/listing_schema.rs +++ b/datafusion/core/src/catalog/listing_schema.rs @@ -92,12 +92,7 @@ impl ListingSchemaProvider { /// Reload table information from ObjectStore pub async fn refresh(&self, state: &SessionState) -> datafusion_common::Result<()> { - let entries: Vec<_> = self - .store - .list(Some(&self.path)) - .await? - .try_collect() - .await?; + let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?; let base = Path::new(self.path.as_ref()); let mut tables = HashSet::new(); for file in entries.iter() { diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 684f416f771a..df6689af6b73 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -673,6 +673,7 @@ mod tests { last_modified: DateTime::default(), size: usize::MAX, e_tag: None, + version: None, }; let num_rows_to_read = 100; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index b541e2a1d44c..7c2331548e5e 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -124,7 +124,8 @@ pub(crate) mod test_util { use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::{ - GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, + GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, PutOptions, + PutResult, }; use tokio::io::AsyncWrite; @@ -189,7 +190,12 @@ pub(crate) mod test_util { #[async_trait] impl ObjectStore for VariableStream { - async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> { + async fn put_opts( + &self, + _location: &Path, + _bytes: Bytes, + _opts: PutOptions, + ) -> object_store::Result { unimplemented!() } @@ -228,6 +234,7 @@ pub(crate) mod test_util { last_modified: Default::default(), size: range.end, e_tag: None, + version: None, }, range: Default::default(), }) @@ -257,11 +264,10 @@ pub(crate) mod test_util { unimplemented!() } - async fn list( + fn list( &self, _prefix: Option<&Path>, - ) -> object_store::Result>> - { + ) -> BoxStream<'_, object_store::Result> { unimplemented!() } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index c4d05adfc6bc..cf6b87408107 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1199,7 +1199,9 @@ mod tests { use log::error; use object_store::local::LocalFileSystem; use object_store::path::Path; - use object_store::{GetOptions, GetResult, ListResult, MultipartId}; + use object_store::{ + GetOptions, GetResult, ListResult, MultipartId, PutOptions, PutResult, + }; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex}; @@ -1283,7 +1285,12 @@ mod tests { #[async_trait] impl ObjectStore for RequestCountingObjectStore { - async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> { + async fn put_opts( + &self, + _location: &Path, + _bytes: Bytes, + _opts: PutOptions, + ) -> object_store::Result { Err(object_store::Error::NotImplemented) } @@ -1320,12 +1327,13 @@ mod tests { Err(object_store::Error::NotImplemented) } - async fn list( + fn list( &self, _prefix: Option<&Path>, - ) -> object_store::Result>> - { - Err(object_store::Error::NotImplemented) + ) -> BoxStream<'_, object_store::Result> { + Box::pin(futures::stream::once(async { + Err(object_store::Error::NotImplemented) + })) } async fn list_with_delimiter( diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index c987f47c0e1c..cfcdbd8c464e 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -19,7 +19,6 @@ //! write support for the various file formats use std::io::Error; -use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -36,109 +35,14 @@ use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::ready; -use futures::FutureExt; use object_store::path::Path; -use object_store::{MultipartId, ObjectMeta, ObjectStore}; +use object_store::{MultipartId, ObjectStore}; use tokio::io::AsyncWrite; pub(crate) mod demux; pub(crate) mod orchestration; -/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. -/// It is specifically designed for the `object_store` crate's `put` method and sends -/// whole bytes at once when the buffer is flushed. -pub struct AsyncPutWriter { - /// Object metadata - object_meta: ObjectMeta, - /// A shared reference to the object store - store: Arc, - /// A buffer that stores the bytes to be sent - current_buffer: Vec, - /// Used for async handling in flush method - inner_state: AsyncPutState, -} - -impl AsyncPutWriter { - /// Constructor for the `AsyncPutWriter` object - pub fn new(object_meta: ObjectMeta, store: Arc) -> Self { - Self { - object_meta, - store, - current_buffer: vec![], - // The writer starts out in buffering mode - inner_state: AsyncPutState::Buffer, - } - } - - /// Separate implementation function that unpins the [`AsyncPutWriter`] so - /// that partial borrows work correctly - fn poll_shutdown_inner( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - loop { - match &mut self.inner_state { - AsyncPutState::Buffer => { - // Convert the current buffer to bytes and take ownership of it - let bytes = Bytes::from(mem::take(&mut self.current_buffer)); - // Set the inner state to Put variant with the bytes - self.inner_state = AsyncPutState::Put { bytes } - } - AsyncPutState::Put { bytes } => { - // Send the bytes to the object store's put method - return Poll::Ready( - ready!(self - .store - .put(&self.object_meta.location, bytes.clone()) - .poll_unpin(cx)) - .map_err(Error::from), - ); - } - } - } - } -} - -/// An enum that represents the inner state of AsyncPut -enum AsyncPutState { - /// Building Bytes struct in this state - Buffer, - /// Data in the buffer is being sent to the object store - Put { bytes: Bytes }, -} - -impl AsyncWrite for AsyncPutWriter { - // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct - fn poll_write( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // Extend the current buffer with the incoming buffer - self.current_buffer.extend_from_slice(buf); - // Return a ready poll with the length of the incoming buffer - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - // Return a ready poll with an empty result - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - // Call the poll_shutdown_inner method to handle the actual sending of data to the object store - self.poll_shutdown_inner(cx) - } -} - /// Stores data needed during abortion of MultiPart writers #[derive(Clone)] pub(crate) struct MultiPart { diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index d6a0add9b253..0021d71c2e4a 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -359,8 +359,7 @@ pub async fn pruned_partition_list<'a>( Some(files) => files, None => { trace!("Recursively listing partition {}", partition.path); - let s = store.list(Some(&partition.path)).await?; - s.try_collect().await? + store.list(Some(&partition.path)).try_collect().await? } }; diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index aa2e20164b5e..87c1663ae718 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -80,6 +80,7 @@ impl PartitionedFile { last_modified: chrono::Utc.timestamp_nanos(0), size: size as usize, e_tag: None, + version: None, }, partition_values: vec![], range: None, @@ -95,6 +96,7 @@ impl PartitionedFile { last_modified: chrono::Utc.timestamp_nanos(0), size: size as usize, e_tag: None, + version: None, }, partition_values: vec![], range: Some(FileRange { start, end }), diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7e5fdf0b3393..11eefd37f236 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1598,13 +1598,8 @@ mod tests { #[tokio::test] async fn test_insert_into_sql_csv_defaults() -> Result<()> { - helper_test_insert_into_sql( - "csv", - FileCompressionType::UNCOMPRESSED, - "", - None, - ) - .await?; + helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None) + .await?; Ok(()) } @@ -1622,13 +1617,8 @@ mod tests { #[tokio::test] async fn test_insert_into_sql_json_defaults() -> Result<()> { - helper_test_insert_into_sql( - "json", - FileCompressionType::UNCOMPRESSED, - "", - None, - ) - .await?; + helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None) + .await?; Ok(()) } diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 4d1ca4853a73..4e0f5121e09c 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -198,17 +198,15 @@ impl ListingTableUrl { let is_dir = self.url.as_str().ends_with('/'); let list = match is_dir { true => match ctx.runtime_env().cache_manager.get_list_files_cache() { - None => futures::stream::once(store.list(Some(&self.prefix))) - .try_flatten() - .boxed(), + None => store.list(Some(&self.prefix)), Some(cache) => { if let Some(res) = cache.get(&self.prefix) { debug!("Hit list all files cache"); futures::stream::iter(res.as_ref().clone().into_iter().map(Ok)) .boxed() } else { - let list_res = store.list(Some(&self.prefix)).await; - let vec = list_res?.try_collect::>().await?; + let list_res = store.list(Some(&self.prefix)); + let vec = list_res.try_collect::>().await?; cache.put(&self.prefix, Arc::new(vec.clone())); futures::stream::iter(vec.into_iter().map(Ok)).boxed() } diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 4e85b31e6806..aca71678d98b 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -49,9 +49,7 @@ use std::{ use super::listing::ListingTableUrl; use crate::error::{DataFusionError, Result}; -use crate::{ - physical_plan::{DisplayAs, DisplayFormatType}, -}; +use crate::physical_plan::{DisplayAs, DisplayFormatType}; use crate::{ datasource::{ listing::{FileRange, PartitionedFile}, @@ -786,6 +784,7 @@ mod tests { last_modified: Utc::now(), size: 42, e_tag: None, + version: None, }; PartitionedFile { diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index f6e999f60249..6e5bc180a503 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1719,6 +1719,7 @@ mod tests { last_modified: Utc.timestamp_nanos(0), size: 1337, e_tag: None, + version: None, }, partition_values: vec![], range: None, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 91bceed91602..09cf6a6db5e0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1243,6 +1243,7 @@ mod tests { last_modified: chrono::DateTime::from(std::time::SystemTime::now()), size: data.len(), e_tag: None, + version: None, }; let in_memory = object_store::memory::InMemory::new(); in_memory diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index 08cebb56cc77..d6f324a7f1f9 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -61,5 +61,6 @@ pub fn local_unpartitioned_file(path: impl AsRef) -> ObjectMeta last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), size: metadata.len() as usize, e_tag: None, + version: None, } } diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 0d11526703b4..f3c0d2987a46 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -113,6 +113,7 @@ impl TestParquetFile { last_modified: Default::default(), size, e_tag: None, + version: None, }; Ok(Self { diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 37481b936d24..3752d42dbf43 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -188,6 +188,7 @@ async fn store_parquet_in_memory( last_modified: chrono::DateTime::from(SystemTime::now()), size: buf.len(), e_tag: None, + version: None, }; (meta, Bytes::from(buf)) diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index b77643c35e84..e1e8b8e66edd 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -50,6 +50,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), size: metadata.len() as usize, e_tag: None, + version: None, }; let schema = ParquetFormat::default() diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index b3134d470b56..25c62f18f5ba 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -194,5 +194,6 @@ pub fn local_unpartitioned_file(path: impl AsRef) -> ObjectMeta last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), size: metadata.len() as usize, e_tag: None, + version: None, } } diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 27d146de798d..dd8eb52f67c7 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -46,7 +46,7 @@ use futures::stream; use futures::stream::BoxStream; use object_store::{ path::Path, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, - ObjectMeta, ObjectStore, + ObjectMeta, ObjectStore, PutOptions, PutResult, }; use tokio::io::AsyncWrite; use url::Url; @@ -620,7 +620,12 @@ impl MirroringObjectStore { #[async_trait] impl ObjectStore for MirroringObjectStore { - async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> { + async fn put_opts( + &self, + _location: &Path, + _bytes: Bytes, + _opts: PutOptions, + ) -> object_store::Result { unimplemented!() } @@ -653,6 +658,7 @@ impl ObjectStore for MirroringObjectStore { last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(), size: metadata.len() as usize, e_tag: None, + version: None, }; Ok(GetResult { @@ -680,26 +686,16 @@ impl ObjectStore for MirroringObjectStore { Ok(data.into()) } - async fn head(&self, location: &Path) -> object_store::Result { - self.files.iter().find(|x| *x == location).unwrap(); - Ok(ObjectMeta { - location: location.clone(), - last_modified: Utc.timestamp_nanos(0), - size: self.file_size as usize, - e_tag: None, - }) - } - async fn delete(&self, _location: &Path) -> object_store::Result<()> { unimplemented!() } - async fn list( + fn list( &self, prefix: Option<&Path>, - ) -> object_store::Result>> { + ) -> BoxStream<'_, object_store::Result> { let prefix = prefix.cloned().unwrap_or_default(); - Ok(Box::pin(stream::iter(self.files.iter().filter_map( + Box::pin(stream::iter(self.files.iter().filter_map( move |location| { // Don't return for exact prefix match let filter = location @@ -713,10 +709,11 @@ impl ObjectStore for MirroringObjectStore { last_modified: Utc.timestamp_nanos(0), size: self.file_size as usize, e_tag: None, + version: None, }) }) }, - )))) + ))) } async fn list_with_delimiter( @@ -750,6 +747,7 @@ impl ObjectStore for MirroringObjectStore { last_modified: Utc.timestamp_nanos(0), size: self.file_size as usize, e_tag: None, + version: None, }; objects.push(object); } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 4a21dc02bd13..c54839061c8a 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -176,6 +176,7 @@ mod tests { .into(), size: 1024, e_tag: None, + version: None, }; let cache = DefaultFileStatisticsCache::default(); assert!(cache.get_with_extra(&meta.location, &meta).is_none()); @@ -219,6 +220,7 @@ mod tests { .into(), size: 1024, e_tag: None, + version: None, }; let cache = DefaultListFilesCache::default(); diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index a956eded9032..700105dbcb2d 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -536,6 +536,7 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64), size: val.size as usize, e_tag: None, + version: None, }, partition_values: val .partition_values diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 1dab1f9d5e39..942798173e0e 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -89,6 +89,7 @@ pub async fn from_substrait_rel( location: path.into(), size, e_tag: None, + version: None, }, partition_values: vec![], range: None, From 6453aee9afc90e0e420853ad146542d4ab5108e3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 2 Nov 2023 11:54:17 +0000 Subject: [PATCH 3/8] Fix datafusion-cli test --- datafusion-cli/src/exec.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index b62ad12dbfbb..14ac22687bf4 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -350,7 +350,7 @@ mod tests { async fn create_object_store_table_gcs() -> Result<()> { let service_account_path = "fake_service_account_path"; let service_account_key = - "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\"}"; + "{\"private_key\": \"fake_private_key.pem\",\"client_email\":\"fake_client_email\", \"private_key_id\":\"id\"}"; let application_credentials_path = "fake_application_credentials_path"; let location = "gcs://bucket/path/file.parquet"; @@ -366,8 +366,9 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('service_account_key' '{service_account_key}') LOCATION '{location}'"); let err = create_external_table_test(location, &sql) .await - .unwrap_err(); - assert!(err.to_string().contains("No RSA key found in pem file")); + .unwrap_err() + .to_string(); + assert!(err.contains("No RSA key found in pem file"), "{err}"); // for application_credentials_path let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET From f055fda6f70d62e10aa1e4af26f3389f619ed1a9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 7 Nov 2023 17:15:37 +0000 Subject: [PATCH 4/8] Update arrow version --- Cargo.toml | 9 ++++-- datafusion-cli/Cargo.lock | 51 +++++++++++++--------------------- datafusion-cli/Cargo.toml | 8 ++++-- datafusion-examples/Cargo.toml | 2 +- datafusion/common/Cargo.toml | 2 +- datafusion/proto/Cargo.toml | 2 +- 6 files changed, 35 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f445af2d62a4..705ef15eb2a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ indexmap = "2.0.0" itertools = "0.11" log = "^0.4" num_cpus = "1.13.0" -object_store = "0.7.0" +object_store = "0.8.0" parking_lot = "0.12" parquet = { version = "48.0.0", features = ["arrow", "async", "object_store"] } rand = "0.8" @@ -110,4 +110,9 @@ rpath = false [patch.crates-io] -object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "ab53d2dd5fb2af2bab69f95a6ddae2226c166500" } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 13e96f3b613e..e81ac2dc61ec 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -131,8 +131,7 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edb738d83750ec705808f6d44046d165e6bb8623f64e29a4d53fcb136ab22dfb" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "ahash", "arrow-arith", @@ -153,8 +152,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5c3d17fc5b006e7beeaebfb1d2edfc92398b981f82d9744130437909b72a468" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "arrow-array", "arrow-buffer", @@ -168,8 +166,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55705ada5cdde4cb0f202ffa6aa756637e33fea30e13d8d0d0fd6a24ffcee1e3" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "ahash", "arrow-buffer", @@ -185,8 +182,7 @@ dependencies = [ [[package]] name = "arrow-buffer" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a722f90a09b94f295ab7102542e97199d3500128843446ef63e410ad546c5333" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "bytes", "half", @@ -196,14 +192,14 @@ dependencies = [ [[package]] name = "arrow-cast" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af01fc1a06f6f2baf31a04776156d47f9f31ca5939fe6d00cd7a059f95a46ff1" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", + "base64", "chrono", "comfy-table", "half", @@ -214,8 +210,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83cbbfde86f9ecd3f875c42a73d8aeab3d95149cd80129b18d09e039ecf5391b" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "arrow-array", "arrow-buffer", @@ -233,8 +228,7 @@ dependencies = [ [[package]] name = "arrow-data" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0a547195e607e625e7fafa1a7269b8df1a4a612c919efd9b26bd86e74538f3a" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "arrow-buffer", "arrow-schema", @@ -245,8 +239,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36bf091502ab7e37775ff448413ef1ffff28ff93789acb669fffdd51b394d51" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "arrow-array", "arrow-buffer", @@ -259,8 +252,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ac346bc84846ab425ab3c8c7b6721db90643bc218939677ed7e071ccbfb919d" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "arrow-array", "arrow-buffer", @@ -279,8 +271,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4502123d2397319f3a13688432bc678c61cb1582f2daa01253186da650bf5841" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "arrow-array", "arrow-buffer", @@ -294,8 +285,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "249fc5a07906ab3f3536a6e9f118ec2883fbcde398a97a5ba70053f0276abda4" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "ahash", "arrow-array", @@ -309,14 +299,12 @@ dependencies = [ [[package]] name = "arrow-schema" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d7a8c3f97f5ef6abd862155a6f39aaba36b029322462d72bbcfa69782a50614" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" [[package]] name = "arrow-select" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f868f4a5001429e20f7c1994b5cd1aa68b82e3db8cf96c559cdb56dc8be21410" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "ahash", "arrow-array", @@ -329,8 +317,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a27fdf8fc70040a2dee78af2e217479cb5b263bd7ab8711c7999e74056eb688a" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "arrow-array", "arrow-buffer", @@ -2267,8 +2254,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.7.1" -source = "git+https://github.com/apache/arrow-rs.git?rev=ab53d2dd5fb2af2bab69f95a6ddae2226c166500#ab53d2dd5fb2af2bab69f95a6ddae2226c166500" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050" dependencies = [ "async-trait", "base64", @@ -2353,8 +2341,7 @@ dependencies = [ [[package]] name = "parquet" version = "48.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "239229e6a668ab50c61de3dce61cf0fa1069345f7aa0f4c934491f92205a4945" +source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" dependencies = [ "ahash", "arrow-array", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 983d8e6a444a..483454496fa7 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -38,7 +38,7 @@ datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avr dirs = "4.0.0" env_logger = "0.9" mimalloc = { version = "0.1", default-features = false } -object_store = { version = "0.7.0", features = ["aws", "gcp"] } +object_store = { version = "0.8.0", features = ["aws", "gcp"] } parking_lot = { version = "0.12" } regex = "1.8" rustyline = "11.0" @@ -52,4 +52,8 @@ predicates = "3.0" rstest = "0.17" [patch.crates-io] -object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "ab53d2dd5fb2af2bab69f95a6ddae2226c166500" } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 57691520a401..be3a4b756b6e 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -46,7 +46,7 @@ futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } num_cpus = { workspace = true } -object_store = { version = "0.7.0", features = ["aws", "http"] } +object_store = { version = "0.8.0", features = ["aws", "http"] } prost = { version = "0.12", default-features = false } prost-derive = { version = "0.11", default-features = false } serde = { version = "1.0.136", features = ["derive"] } diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index d04db86b7830..a2896b9f63cd 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -47,7 +47,7 @@ arrow-schema = { workspace = true } chrono = { workspace = true } half = { version = "2.1", default-features = false } num_cpus = { workspace = true } -object_store = { version = "0.7.0", default-features = false, optional = true } +object_store = { version = "0.8.0", default-features = false, optional = true } parquet = { workspace = true, optional = true } pyo3 = { version = "0.20.0", optional = true } sqlparser = { workspace = true } diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 72a4df66ebd7..8fe42f3c4a11 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -46,7 +46,7 @@ chrono = { workspace = true } datafusion = { path = "../core", version = "32.0.0" } datafusion-common = { workspace = true } datafusion-expr = { workspace = true } -object_store = { version = "0.7.0" } +object_store = { version = "0.8.0" } pbjson = { version = "0.5", optional = true } prost = "0.12.0" serde = { version = "1.0", optional = true } From 0674e3ea1ea3d40c1d4b22095a4f67c9c8d687c8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 7 Nov 2023 17:50:59 +0000 Subject: [PATCH 5/8] Update tests --- datafusion/physical-expr/src/expressions/cast.rs | 6 +++++- datafusion/physical-expr/src/expressions/try_cast.rs | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 9390089063a0..8605eeb4434b 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -669,7 +669,11 @@ mod tests { // Ensure a useful error happens at plan time if invalid casts are used let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - let result = cast(col("a", &schema).unwrap(), &schema, DataType::LargeBinary); + let result = cast( + col("a", &schema).unwrap(), + &schema, + DataType::Interval(IntervalUnit::MonthDayNano), + ); result.expect_err("expected Invalid CAST"); } diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index cba026c56513..8c114e2702c5 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -549,7 +549,11 @@ mod tests { // Ensure a useful error happens at plan time if invalid casts are used let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - let result = try_cast(col("a", &schema).unwrap(), &schema, DataType::LargeBinary); + let result = try_cast( + col("a", &schema).unwrap(), + &schema, + DataType::Interval(IntervalUnit::MonthDayNano), + ); result.expect_err("expected Invalid TRY_CAST"); } From 382bddea42b7893167c926ce46ae5cb7f2dd4dc6 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 17 Nov 2023 22:33:07 +0000 Subject: [PATCH 6/8] Update pin --- Cargo.toml | 24 ++++------- datafusion-cli/Cargo.lock | 75 ++++++++++++++++++++-------------- datafusion-cli/Cargo.toml | 10 +---- datafusion/wasmtest/Cargo.toml | 2 +- 4 files changed, 54 insertions(+), 57 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 25cdfeca2acb..cbd811fd508f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,12 +49,12 @@ rust-version = "1.70" version = "33.0.0" [workspace.dependencies] -arrow = { version = "48.0.0", features = ["prettyprint"] } -arrow-array = { version = "48.0.0", default-features = false, features = ["chrono-tz"] } -arrow-buffer = { version = "48.0.0", default-features = false } -arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] } -arrow-ord = { version = "48.0.0", default-features = false } -arrow-schema = { version = "48.0.0", default-features = false } +arrow = { version = "49.0.0", features = ["prettyprint"] } +arrow-array = { version = "49.0.0", default-features = false, features = ["chrono-tz"] } +arrow-buffer = { version = "49.0.0", default-features = false } +arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } +arrow-ord = { version = "49.0.0", default-features = false } +arrow-schema = { version = "49.0.0", default-features = false } async-trait = "0.1.73" bigdecimal = "0.4.1" bytes = "1.4" @@ -81,7 +81,7 @@ log = "^0.4" num_cpus = "1.13.0" object_store = "0.8.0" parking_lot = "0.12" -parquet = { version = "48.0.0", features = ["arrow", "async", "object_store"] } +parquet = { version = "49.0.0", features = ["arrow", "async", "object_store"] } rand = "0.8" rstest = "0.18.0" serde_json = "1" @@ -108,13 +108,3 @@ opt-level = 3 overflow-checks = false panic = 'unwind' rpath = false - - -[patch.crates-io] -arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-ord = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 960189aabd56..4660fac232df 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -130,8 +130,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bc25126d18a012146a888a0298f2c22e1150327bd2765fc76d710a556b2d614" dependencies = [ "ahash", "arrow-arith", @@ -151,8 +152,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7" dependencies = [ "arrow-array", "arrow-buffer", @@ -165,8 +167,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d" dependencies = [ "ahash", "arrow-buffer", @@ -181,8 +184,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01a0fc21915b00fc6c2667b069c1b64bdd920982f426079bc4a7cab86822886c" dependencies = [ "bytes", "half", @@ -191,8 +195,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a" dependencies = [ "arrow-array", "arrow-buffer", @@ -209,8 +214,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e09aa6246a1d6459b3f14baeaa49606cfdbca34435c46320e14054d244987ca" dependencies = [ "arrow-array", "arrow-buffer", @@ -227,8 +233,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634" dependencies = [ "arrow-buffer", "arrow-schema", @@ -238,8 +245,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd" dependencies = [ "arrow-array", "arrow-buffer", @@ -251,8 +259,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82565c91fd627922ebfe2810ee4e8346841b6f9361b87505a9acea38b614fee" dependencies = [ "arrow-array", "arrow-buffer", @@ -270,8 +279,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4" dependencies = [ "arrow-array", "arrow-buffer", @@ -284,8 +294,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a" dependencies = [ "ahash", "arrow-array", @@ -298,13 +309,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167" [[package]] name = "arrow-select" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036" dependencies = [ "ahash", "arrow-array", @@ -316,8 +329,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a48149c63c11c9ff571e50ab8f017d2a7cb71037a882b42f6354ed2da9acc7" dependencies = [ "arrow-array", "arrow-buffer", @@ -2352,8 +2366,9 @@ dependencies = [ [[package]] name = "parquet" -version = "48.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=ffeda62fc9d6a182c1bf3b3212e676f74fc196df#ffeda62fc9d6a182c1bf3b3212e676f74fc196df" +version = "49.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af88740a842787da39b3d69ce5fbf6fce97d20211d3b299fee0a0da6430c74d4" dependencies = [ "ahash", "arrow-array", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index dd5f7dce7cb0..dd7a077988cb 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -29,7 +29,7 @@ rust-version = "1.70" readme = "README.md" [dependencies] -arrow = "48.0.0" +arrow = "49.0.0" async-trait = "0.1.41" aws-config = "0.55" aws-credential-types = "0.55" @@ -50,11 +50,3 @@ assert_cmd = "2.0" ctor = "0.2.0" predicates = "3.0" rstest = "0.17" - -[patch.crates-io] -arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-ord = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } -parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "ffeda62fc9d6a182c1bf3b3212e676f74fc196df" } diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index 882b02bcc84b..f213c375b0d0 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -46,5 +46,5 @@ datafusion-sql = { workspace = true } # getrandom must be compiled with js feature getrandom = { version = "0.2.8", features = ["js"] } -parquet = { version = "48.0.0", default-features = false } +parquet = { version = "49.0.0", default-features = false } wasm-bindgen = "0.2.87" From 69ee33daee10d175f9c036505ef222b32681d1bf Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 17 Nov 2023 22:44:41 +0000 Subject: [PATCH 7/8] Unignore fifo test --- datafusion/core/tests/fifo.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 67841ffec07e..93c7f7368065 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -323,7 +323,6 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] - #[ignore] async fn test_sql_insert_into_fifo() -> Result<()> { // To make unbounded deterministic let waiting = Arc::new(AtomicBool::new(true)); From 4687f22b1edb31333dc3d7ce1547432cdb1d60dc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 17 Nov 2023 22:51:43 +0000 Subject: [PATCH 8/8] Update lockfile --- datafusion-cli/Cargo.lock | 71 ++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 4bc61a48a36e..06bc14c5b656 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -130,9 +130,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8919668503a4f2d8b6da96fa7c16e93046bfb3412ffcfa1e5dc7d2e3adcb378" +checksum = "5bc25126d18a012146a888a0298f2c22e1150327bd2765fc76d710a556b2d614" dependencies = [ "ahash", "arrow-arith", @@ -152,9 +152,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef983914f477d4278b068f13b3224b7d19eb2b807ac9048544d3bfebdf2554c4" +checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7" dependencies = [ "arrow-array", "arrow-buffer", @@ -167,9 +167,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6eaf89041fa5937940ae390294ece29e1db584f46d995608d6e5fe65a2e0e9b" +checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d" dependencies = [ "ahash", "arrow-buffer", @@ -184,9 +184,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55512d988c6fbd76e514fd3ff537ac50b0a675da5a245e4fdad77ecfd654205f" +checksum = "01a0fc21915b00fc6c2667b069c1b64bdd920982f426079bc4a7cab86822886c" dependencies = [ "bytes", "half", @@ -195,15 +195,16 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655ee51a2156ba5375931ce21c1b2494b1d9260e6dcdc6d4db9060c37dc3325b" +checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", + "base64", "chrono", "comfy-table", "half", @@ -213,9 +214,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "258bb689997ad5b6660b3ce3638bd6b383d668ec555ed41ad7c6559cbb2e4f91" +checksum = "2e09aa6246a1d6459b3f14baeaa49606cfdbca34435c46320e14054d244987ca" dependencies = [ "arrow-array", "arrow-buffer", @@ -232,9 +233,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dc2b9fec74763427e2e5575b8cc31ce96ba4c9b4eb05ce40e0616d9fad12461" +checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634" dependencies = [ "arrow-buffer", "arrow-schema", @@ -244,9 +245,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eaa6ab203cc6d89b7eaa1ac781c1dfeef325454c5d5a0419017f95e6bafc03c" +checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd" dependencies = [ "arrow-array", "arrow-buffer", @@ -258,9 +259,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb64e30d9b73f66fdc5c52d5f4cf69bbf03d62f64ffeafa0715590a5320baed7" +checksum = "d82565c91fd627922ebfe2810ee4e8346841b6f9361b87505a9acea38b614fee" dependencies = [ "arrow-array", "arrow-buffer", @@ -278,9 +279,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a818951c0d11c428dda03e908175969c262629dd20bd0850bd6c7a8c3bfe48" +checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4" dependencies = [ "arrow-array", "arrow-buffer", @@ -293,9 +294,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d664318bc05f930559fc088888f0f7174d3c5bc888c0f4f9ae8f23aa398ba3" +checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a" dependencies = [ "ahash", "arrow-array", @@ -308,15 +309,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf4d737bba93da59f16129bec21e087aed0be84ff840e74146d4703879436cb" +checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167" [[package]] name = "arrow-select" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374c4c3b812ecc2118727b892252a4a4308f87a8aca1dbf09f3ce4bc578e668a" +checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036" dependencies = [ "ahash", "arrow-array", @@ -328,9 +329,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15aed5624bb23da09142f58502b59c23f5bea607393298bb81dab1ce60fc769" +checksum = "a4a48149c63c11c9ff571e50ab8f017d2a7cb71037a882b42f6354ed2da9acc7" dependencies = [ "arrow-array", "arrow-buffer", @@ -2288,9 +2289,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.7.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" +checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050" dependencies = [ "async-trait", "base64", @@ -2305,7 +2306,7 @@ dependencies = [ "quick-xml", "rand", "reqwest", - "ring 0.16.20", + "ring 0.17.5", "rustls-pemfile", "serde", "serde_json", @@ -2374,9 +2375,9 @@ dependencies = [ [[package]] name = "parquet" -version = "48.0.1" +version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bfe55df96e3f02f11bf197ae37d91bb79801631f82f6195dd196ef521df3597" +checksum = "af88740a842787da39b3d69ce5fbf6fce97d20211d3b299fee0a0da6430c74d4" dependencies = [ "ahash", "arrow-array", @@ -2597,9 +2598,9 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quick-xml" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" dependencies = [ "memchr", "serde",