diff --git a/arrow-schema/src/fields.rs b/arrow-schema/src/fields.rs index 70cb1968e9a4..400f42c59c30 100644 --- a/arrow-schema/src/fields.rs +++ b/arrow-schema/src/fields.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::{ArrowError, Field, FieldRef, SchemaBuilder}; use std::ops::Deref; use std::sync::Arc; +use crate::{ArrowError, DataType, Field, FieldRef, SchemaBuilder}; + /// A cheaply cloneable, owned slice of [`FieldRef`] /// /// Similar to `Arc>` or `Arc<[FieldRef]>` @@ -99,6 +100,108 @@ impl Fields { .all(|(a, b)| Arc::ptr_eq(a, b) || a.contains(b)) } + /// Returns a copy of this [`Fields`] containing only those [`FieldRef`] passing a predicate + /// + /// Performs a depth-first scan of [`Fields`] invoking `filter` for each [`FieldRef`] + /// containing no child [`FieldRef`], a leaf field, along with a count of the number + /// of such leaves encountered so far. Only [`FieldRef`] for which `filter` + /// returned `true` will be included in the result. + /// + /// This can therefore be used to select a subset of fields from nested types + /// such as [`DataType::Struct`] or [`DataType::List`]. + /// + /// ``` + /// # use arrow_schema::{DataType, Field, Fields}; + /// let fields = Fields::from(vec![ + /// Field::new("a", DataType::Int32, true), // Leaf 0 + /// Field::new("b", DataType::Struct(Fields::from(vec![ + /// Field::new("c", DataType::Float32, false), // Leaf 1 + /// Field::new("d", DataType::Float64, false), // Leaf 2 + /// Field::new("e", DataType::Struct(Fields::from(vec![ + /// Field::new("f", DataType::Int32, false), // Leaf 3 + /// Field::new("g", DataType::Float16, false), // Leaf 4 + /// ])), true), + /// ])), false) + /// ]); + /// let filtered = fields.filter_leaves(|idx, _| [0, 2, 3, 4].contains(&idx)); + /// let expected = Fields::from(vec![ + /// Field::new("a", DataType::Int32, true), + /// Field::new("b", DataType::Struct(Fields::from(vec![ + /// Field::new("d", DataType::Float64, false), + /// Field::new("e", DataType::Struct(Fields::from(vec![ + /// Field::new("f", DataType::Int32, false), + /// Field::new("g", DataType::Float16, false), + /// ])), true), + /// ])), false) + /// ]); + /// assert_eq!(filtered, expected); + /// ``` + pub fn filter_leaves bool>(&self, mut filter: F) -> Self { + fn filter_field bool>( + f: &FieldRef, + filter: &mut F, + ) -> Option { + use DataType::*; + + let v = match f.data_type() { + Dictionary(_, v) => v.as_ref(), // Key must be integer + RunEndEncoded(_, v) => v.data_type(), // Run-ends must be integer + d => d, + }; + let d = match v { + List(child) => List(filter_field(child, filter)?), + LargeList(child) => LargeList(filter_field(child, filter)?), + Map(child, ordered) => Map(filter_field(child, filter)?, *ordered), + FixedSizeList(child, size) => FixedSizeList(filter_field(child, filter)?, *size), + Struct(fields) => { + let filtered: Fields = fields + .iter() + .filter_map(|f| filter_field(f, filter)) + .collect(); + + if filtered.is_empty() { + return None; + } + + Struct(filtered) + } + Union(fields, mode) => { + let filtered: UnionFields = fields + .iter() + .filter_map(|(id, f)| Some((id, filter_field(f, filter)?))) + .collect(); + + if filtered.is_empty() { + return None; + } + + Union(filtered, *mode) + } + _ => return filter(f).then(|| f.clone()), + }; + let d = match f.data_type() { + Dictionary(k, _) => Dictionary(k.clone(), Box::new(d)), + RunEndEncoded(v, f) => { + RunEndEncoded(v.clone(), Arc::new(f.as_ref().clone().with_data_type(d))) + } + _ => d, + }; + Some(Arc::new(f.as_ref().clone().with_data_type(d))) + } + + let mut leaf_idx = 0; + let mut filter = |f: &FieldRef| { + let t = filter(leaf_idx, f); + leaf_idx += 1; + t + }; + + self.0 + .iter() + .filter_map(|f| filter_field(f, &mut filter)) + .collect() + } + /// Remove a field by index and return it. /// /// # Panic @@ -117,6 +220,8 @@ impl Fields { /// assert_eq!(fields.remove(1), Field::new("b", DataType::Int8, false).into()); /// assert_eq!(fields.len(), 2); /// ``` + #[deprecated(note = "Use SchemaBuilder::remove")] + #[doc(hidden)] pub fn remove(&mut self, index: usize) -> FieldRef { let mut builder = SchemaBuilder::from(Fields::from(&*self.0)); let field = builder.remove(index); @@ -305,3 +410,130 @@ impl FromIterator<(i8, FieldRef)> for UnionFields { Self(iter.into_iter().collect()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::UnionMode; + + #[test] + fn test_filter() { + let floats = Fields::from(vec![ + Field::new("a", DataType::Float32, false), + Field::new("b", DataType::Float32, false), + ]); + let fields = Fields::from(vec![ + Field::new("a", DataType::Int32, true), + Field::new("floats", DataType::Struct(floats.clone()), true), + Field::new("b", DataType::Int16, true), + Field::new( + "c", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ), + Field::new( + "d", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Struct(floats.clone())), + ), + false, + ), + Field::new_list( + "e", + Field::new("floats", DataType::Struct(floats.clone()), true), + true, + ), + Field::new( + "f", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, false)), 3), + false, + ), + Field::new_map( + "g", + "entries", + Field::new("keys", DataType::LargeUtf8, false), + Field::new("values", DataType::Int32, true), + false, + false, + ), + Field::new( + "h", + DataType::Union( + UnionFields::new( + vec![1, 3], + vec![ + Field::new("field1", DataType::UInt8, false), + Field::new("field3", DataType::Utf8, false), + ], + ), + UnionMode::Dense, + ), + true, + ), + Field::new( + "i", + DataType::RunEndEncoded( + Arc::new(Field::new("run_ends", DataType::Int32, false)), + Arc::new(Field::new("values", DataType::Struct(floats.clone()), true)), + ), + false, + ), + ]); + + let floats_a = DataType::Struct(vec![floats[0].clone()].into()); + + let r = fields.filter_leaves(|idx, _| idx == 0 || idx == 1); + assert_eq!(r.len(), 2); + assert_eq!(r[0], fields[0]); + assert_eq!(r[1].data_type(), &floats_a); + + let r = fields.filter_leaves(|_, f| f.name() == "a"); + assert_eq!(r.len(), 5); + assert_eq!(r[0], fields[0]); + assert_eq!(r[1].data_type(), &floats_a); + assert_eq!( + r[2].data_type(), + &DataType::Dictionary(Box::new(DataType::Int32), Box::new(floats_a.clone())) + ); + assert_eq!( + r[3].as_ref(), + &Field::new_list("e", Field::new("floats", floats_a.clone(), true), true) + ); + assert_eq!( + r[4].as_ref(), + &Field::new( + "i", + DataType::RunEndEncoded( + Arc::new(Field::new("run_ends", DataType::Int32, false)), + Arc::new(Field::new("values", floats_a.clone(), true)), + ), + false, + ) + ); + + let r = fields.filter_leaves(|_, f| f.name() == "floats"); + assert_eq!(r.len(), 0); + + let r = fields.filter_leaves(|idx, _| idx == 9); + assert_eq!(r.len(), 1); + assert_eq!(r[0], fields[6]); + + let r = fields.filter_leaves(|idx, _| idx == 10 || idx == 11); + assert_eq!(r.len(), 1); + assert_eq!(r[0], fields[7]); + + let union = DataType::Union( + UnionFields::new(vec![1], vec![Field::new("field1", DataType::UInt8, false)]), + UnionMode::Dense, + ); + + let r = fields.filter_leaves(|idx, _| idx == 12); + assert_eq!(r.len(), 1); + assert_eq!(r[0].data_type(), &union); + + let r = fields.filter_leaves(|idx, _| idx == 14 || idx == 15); + assert_eq!(r.len(), 1); + assert_eq!(r[0], fields[9]); + } +} diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 711e4cb3314d..e547e5df3a5a 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -402,6 +402,9 @@ impl Schema { /// assert_eq!(schema.remove(1), Field::new("b", DataType::Int8, false).into()); /// assert_eq!(schema.fields.len(), 2); /// ``` + #[deprecated(note = "Use SchemaBuilder::remove")] + #[doc(hidden)] + #[allow(deprecated)] pub fn remove(&mut self, index: usize) -> FieldRef { self.fields.remove(index) } diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 3e47abd4bcc5..ecbe556c6dfe 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -45,7 +45,7 @@ use percent_encoding::{utf8_percent_encode, PercentEncode}; use quick_xml::events::{self as xml_events}; use reqwest::{ header::{CONTENT_LENGTH, CONTENT_TYPE}, - Client as ReqwestClient, Method, RequestBuilder, Response, StatusCode, + Client as ReqwestClient, Method, RequestBuilder, Response, }; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; @@ -466,6 +466,9 @@ impl S3Client { Some(S3CopyIfNotExists::Header(k, v)) => { builder = builder.header(k, v); } + Some(S3CopyIfNotExists::HeaderWithStatus(k, v, _)) => { + builder = builder.header(k, v); + } None => { return Err(crate::Error::NotSupported { source: "S3 does not support copy-if-not-exists".to_string().into(), @@ -474,6 +477,11 @@ impl S3Client { } } + let precondition_failure = match &self.config.copy_if_not_exists { + Some(S3CopyIfNotExists::HeaderWithStatus(_, _, code)) => *code, + _ => reqwest::StatusCode::PRECONDITION_FAILED, + }; + builder .with_aws_sigv4( credential.as_deref(), @@ -485,7 +493,7 @@ impl S3Client { .send_retry(&self.config.retry_config) .await .map_err(|source| match source.status() { - Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists { + Some(error) if error == precondition_failure => crate::Error::AlreadyExists { source: Box::new(source), path: to.to_string(), }, diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index a50b57fe23f7..ada5f3b83f07 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -17,11 +17,13 @@ use crate::config::Parse; +use itertools::Itertools; + /// Configure how to provide [`ObjectStore::copy_if_not_exists`] for [`AmazonS3`]. /// /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists /// [`AmazonS3`]: super::AmazonS3 -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] #[non_exhaustive] pub enum S3CopyIfNotExists { /// Some S3-compatible stores, such as Cloudflare R2, support copy if not exists @@ -29,7 +31,7 @@ pub enum S3CopyIfNotExists { /// /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy operation /// with the provided header pair, and expect the store to fail with `412 Precondition Failed` - /// if the destination file already exists + /// if the destination file already exists. /// /// Encoded as `header::` ignoring whitespace /// @@ -38,12 +40,20 @@ pub enum S3CopyIfNotExists { /// /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists Header(String, String), + /// The same as [`S3CopyIfNotExists::Header`] but allows custom status code checking, for object stores that return values + /// other than 412. + /// + /// Encoded as `header-with-status:::` ignoring whitespace + HeaderWithStatus(String, String, reqwest::StatusCode), } impl std::fmt::Display for S3CopyIfNotExists { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Header(k, v) => write!(f, "header: {}: {}", k, v), + Self::HeaderWithStatus(k, v, code) => { + write!(f, "header-with-status: {k}: {v}: {}", code.as_u16()) + } } } } @@ -56,6 +66,17 @@ impl S3CopyIfNotExists { let (k, v) = value.split_once(':')?; Some(Self::Header(k.trim().to_string(), v.trim().to_string())) } + "header-with-status" => { + let (k, v, status) = value.split(':').collect_tuple()?; + + let code = status.trim().parse().ok()?; + + Some(Self::HeaderWithStatus( + k.trim().to_string(), + v.trim().to_string(), + code, + )) + } _ => None, } } @@ -111,3 +132,76 @@ impl Parse for S3ConditionalPut { }) } } + +#[cfg(test)] +mod tests { + use super::S3CopyIfNotExists; + + #[test] + fn parse_s3_copy_if_not_exists_header() { + let input = "header: cf-copy-destination-if-none-match: *"; + let expected = Some(S3CopyIfNotExists::Header( + "cf-copy-destination-if-none-match".to_owned(), + "*".to_owned(), + )); + + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + + #[test] + fn parse_s3_copy_if_not_exists_header_with_status() { + let input = "header-with-status:key:value:403"; + let expected = Some(S3CopyIfNotExists::HeaderWithStatus( + "key".to_owned(), + "value".to_owned(), + reqwest::StatusCode::FORBIDDEN, + )); + + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + + #[test] + fn parse_s3_copy_if_not_exists_header_whitespace_invariant() { + let expected = Some(S3CopyIfNotExists::Header( + "cf-copy-destination-if-none-match".to_owned(), + "*".to_owned(), + )); + + const INPUTS: &[&str] = &[ + "header:cf-copy-destination-if-none-match:*", + "header: cf-copy-destination-if-none-match:*", + "header: cf-copy-destination-if-none-match: *", + "header : cf-copy-destination-if-none-match: *", + "header : cf-copy-destination-if-none-match : *", + "header : cf-copy-destination-if-none-match : * ", + ]; + + for input in INPUTS { + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + } + + #[test] + fn parse_s3_copy_if_not_exists_header_with_status_whitespace_invariant() { + let expected = Some(S3CopyIfNotExists::HeaderWithStatus( + "key".to_owned(), + "value".to_owned(), + reqwest::StatusCode::FORBIDDEN, + )); + + const INPUTS: &[&str] = &[ + "header-with-status:key:value:403", + "header-with-status: key:value:403", + "header-with-status: key: value:403", + "header-with-status: key: value: 403", + "header-with-status : key: value: 403", + "header-with-status : key : value: 403", + "header-with-status : key : value : 403", + "header-with-status : key : value : 403 ", + ]; + + for input in INPUTS { + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + } +} diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index d0720dd24306..0d5144f61c26 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -18,7 +18,7 @@ use bytes::Bytes; use half::f16; -use crate::basic::{Encoding, LogicalType, Type}; +use crate::basic::{ConvertedType, Encoding, LogicalType, Type}; use crate::bloom_filter::Sbbf; use crate::column::writer::{ compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min, @@ -137,7 +137,10 @@ pub struct ColumnValueEncoderImpl { impl ColumnValueEncoderImpl { fn write_slice(&mut self, slice: &[T::T]) -> Result<()> { - if self.statistics_enabled == EnabledStatistics::Page { + if self.statistics_enabled == EnabledStatistics::Page + // INTERVAL has undefined sort order, so don't write min/max stats for it + && self.descr.converted_type() != ConvertedType::INTERVAL + { if let Some((min, max)) = self.min_max(slice, None) { update_min(&self.descr, &min, &mut self.min_value); update_max(&self.descr, &max, &mut self.max_value); diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 59430b676200..50b502f8c41c 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -332,7 +332,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // If only computing chunk-level statistics compute them here, page-level statistics // are computed in [`Self::write_mini_batch`] and used to update chunk statistics in // [`Self::add_data_page`] - if self.statistics_enabled == EnabledStatistics::Chunk { + if self.statistics_enabled == EnabledStatistics::Chunk + // INTERVAL has undefined sort order, so don't write min/max stats for it + && self.descr.converted_type() != ConvertedType::INTERVAL + { match (min, max) { (Some(min), Some(max)) => { update_min(&self.descr, min, &mut self.column_metrics.min_column_value); @@ -1111,7 +1114,6 @@ fn is_nan(descr: &ColumnDescriptor, val: &T) -> bool { /// /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with /// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` - fn update_stat( descr: &ColumnDescriptor, val: &T, @@ -3160,6 +3162,30 @@ mod tests { Ok(()) } + #[test] + fn test_interval_stats_should_not_have_min_max() { + let input = [ + vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1], + vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2], + ] + .into_iter() + .map(|s| ByteArray::from(s).into()) + .collect::>(); + + let page_writer = get_test_page_writer(); + let mut writer = get_test_interval_column_writer(page_writer); + writer.write_batch(&input, None, None).unwrap(); + + let metadata = writer.close().unwrap().metadata; + let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() { + stats.clone() + } else { + panic!("metadata missing statistics"); + }; + assert!(!stats.has_min_max_set()); + } + fn write_multiple_pages( column_descr: &Arc, pages: &[&[Option]], @@ -3489,8 +3515,7 @@ mod tests { values: &[FixedLenByteArray], ) -> ValueStatistics { let page_writer = get_test_page_writer(); - let props = Default::default(); - let mut writer = get_test_float16_column_writer(page_writer, 0, 0, props); + let mut writer = get_test_float16_column_writer(page_writer); writer.write_batch(values, None, None).unwrap(); let metadata = writer.close().unwrap().metadata; @@ -3503,12 +3528,9 @@ mod tests { fn get_test_float16_column_writer( page_writer: Box, - max_def_level: i16, - max_rep_level: i16, - props: WriterPropertiesPtr, ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> { - let descr = Arc::new(get_test_float16_column_descr(max_def_level, max_rep_level)); - let column_writer = get_column_writer(descr, props, page_writer); + let descr = Arc::new(get_test_float16_column_descr(0, 0)); + let column_writer = get_column_writer(descr, Default::default(), page_writer); get_typed_column_writer::(column_writer) } @@ -3523,6 +3545,25 @@ mod tests { ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path) } + fn get_test_interval_column_writer( + page_writer: Box, + ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> { + let descr = Arc::new(get_test_interval_column_descr()); + let column_writer = get_column_writer(descr, Default::default(), page_writer); + get_typed_column_writer::(column_writer) + } + + fn get_test_interval_column_descr() -> ColumnDescriptor { + let path = ColumnPath::from("col"); + let tpe = + SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type()) + .with_length(12) + .with_converted_type(ConvertedType::INTERVAL) + .build() + .unwrap(); + ColumnDescriptor::new(Arc::new(tpe), 0, 0, path) + } + /// Returns column writer for UINT32 Column provided as ConvertedType only fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>( page_writer: Box,