diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 8a1cd2a147c7..25956665d56c 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -93,7 +93,8 @@ const BUFFER_FLUSH_BYTES: usize = 1024000; #[derive(Default)] /// Factory struct used to create [ParquetFormat] pub struct ParquetFormatFactory { - options: Option, + /// inner options for parquet + pub options: Option, } impl ParquetFormatFactory { diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index c59aaa2d42bb..9268ccca0b70 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -434,6 +434,7 @@ message JsonOptions { message TableParquetOptions { ParquetOptions global = 1; repeated ParquetColumnSpecificOptions column_specific_options = 2; + map key_value_metadata = 3; } message ParquetColumnSpecificOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 45703d8b9fed..3487f43ae24e 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -961,48 +961,48 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { } } -impl TryFrom<&protobuf::ColumnOptions> for ParquetColumnOptions { +impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions { type Error = DataFusionError; fn try_from( - value: &protobuf::ColumnOptions, + value: &protobuf::ParquetColumnOptions, ) -> datafusion_common::Result { Ok(ParquetColumnOptions { compression: value.compression_opt.clone().map(|opt| match opt { - protobuf::column_options::CompressionOpt::Compression(v) => Some(v), + protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v), }).unwrap_or(None), - dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v), + dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v), statistics_enabled: value .statistics_enabled_opt.clone() .map(|opt| match opt { - protobuf::column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v), + protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v), }) .unwrap_or(None), max_statistics_size: value .max_statistics_size_opt.clone() .map(|opt| match opt { - protobuf::column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => Some(v as usize), + protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => Some(v as usize), }) .unwrap_or(None), encoding: value .encoding_opt.clone() .map(|opt| match opt { - protobuf::column_options::EncodingOpt::Encoding(v) => Some(v), + protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v), }) .unwrap_or(None), bloom_filter_enabled: value.bloom_filter_enabled_opt.clone().map(|opt| match opt { - protobuf::column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v), + protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v), }) .unwrap_or(None), bloom_filter_fpp: value .bloom_filter_fpp_opt.clone() .map(|opt| match opt { - protobuf::column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v), + protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v), }) .unwrap_or(None), bloom_filter_ndv: value .bloom_filter_ndv_opt.clone() .map(|opt| match opt { - protobuf::column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v), + protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v), }) .unwrap_or(None), }) @@ -1016,7 +1016,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { ) -> datafusion_common::Result { let mut column_specific_options: HashMap = HashMap::new(); - for protobuf::ColumnSpecificOptions { + for protobuf::ParquetColumnSpecificOptions { column_name, options: maybe_options, } in &value.column_specific_options diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 23dd5746929d..05e57f5585a6 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -880,347 +880,7 @@ impl<'de> serde::Deserialize<'de> for Column { deserializer.deserialize_struct("datafusion_common.Column", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for ColumnOptions { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if self.bloom_filter_enabled_opt.is_some() { - len += 1; - } - if self.encoding_opt.is_some() { - len += 1; - } - if self.dictionary_enabled_opt.is_some() { - len += 1; - } - if self.compression_opt.is_some() { - len += 1; - } - if self.statistics_enabled_opt.is_some() { - len += 1; - } - if self.bloom_filter_fpp_opt.is_some() { - len += 1; - } - if self.bloom_filter_ndv_opt.is_some() { - len += 1; - } - if self.max_statistics_size_opt.is_some() { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion_common.ColumnOptions", len)?; - if let Some(v) = self.bloom_filter_enabled_opt.as_ref() { - match v { - column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => { - struct_ser.serialize_field("bloomFilterEnabled", v)?; - } - } - } - if let Some(v) = self.encoding_opt.as_ref() { - match v { - column_options::EncodingOpt::Encoding(v) => { - struct_ser.serialize_field("encoding", v)?; - } - } - } - if let Some(v) = self.dictionary_enabled_opt.as_ref() { - match v { - column_options::DictionaryEnabledOpt::DictionaryEnabled(v) => { - struct_ser.serialize_field("dictionaryEnabled", v)?; - } - } - } - if let Some(v) = self.compression_opt.as_ref() { - match v { - column_options::CompressionOpt::Compression(v) => { - struct_ser.serialize_field("compression", v)?; - } - } - } - if let Some(v) = self.statistics_enabled_opt.as_ref() { - match v { - column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => { - struct_ser.serialize_field("statisticsEnabled", v)?; - } - } - } - if let Some(v) = self.bloom_filter_fpp_opt.as_ref() { - match v { - column_options::BloomFilterFppOpt::BloomFilterFpp(v) => { - struct_ser.serialize_field("bloomFilterFpp", v)?; - } - } - } - if let Some(v) = self.bloom_filter_ndv_opt.as_ref() { - match v { - column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => { - #[allow(clippy::needless_borrow)] - struct_ser.serialize_field("bloomFilterNdv", ToString::to_string(&v).as_str())?; - } - } - } - if let Some(v) = self.max_statistics_size_opt.as_ref() { - match v { - column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => { - struct_ser.serialize_field("maxStatisticsSize", v)?; - } - } - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for ColumnOptions { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "bloom_filter_enabled", - "bloomFilterEnabled", - "encoding", - "dictionary_enabled", - "dictionaryEnabled", - "compression", - "statistics_enabled", - "statisticsEnabled", - "bloom_filter_fpp", - "bloomFilterFpp", - "bloom_filter_ndv", - "bloomFilterNdv", - "max_statistics_size", - "maxStatisticsSize", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - BloomFilterEnabled, - Encoding, - DictionaryEnabled, - Compression, - StatisticsEnabled, - BloomFilterFpp, - BloomFilterNdv, - MaxStatisticsSize, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "bloomFilterEnabled" | "bloom_filter_enabled" => Ok(GeneratedField::BloomFilterEnabled), - "encoding" => Ok(GeneratedField::Encoding), - "dictionaryEnabled" | "dictionary_enabled" => Ok(GeneratedField::DictionaryEnabled), - "compression" => Ok(GeneratedField::Compression), - "statisticsEnabled" | "statistics_enabled" => Ok(GeneratedField::StatisticsEnabled), - "bloomFilterFpp" | "bloom_filter_fpp" => Ok(GeneratedField::BloomFilterFpp), - "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), - "maxStatisticsSize" | "max_statistics_size" => Ok(GeneratedField::MaxStatisticsSize), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = ColumnOptions; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion_common.ColumnOptions") - } - - fn visit_map(self, mut map_: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut bloom_filter_enabled_opt__ = None; - let mut encoding_opt__ = None; - let mut dictionary_enabled_opt__ = None; - let mut compression_opt__ = None; - let mut statistics_enabled_opt__ = None; - let mut bloom_filter_fpp_opt__ = None; - let mut bloom_filter_ndv_opt__ = None; - let mut max_statistics_size_opt__ = None; - while let Some(k) = map_.next_key()? { - match k { - GeneratedField::BloomFilterEnabled => { - if bloom_filter_enabled_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("bloomFilterEnabled")); - } - bloom_filter_enabled_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(column_options::BloomFilterEnabledOpt::BloomFilterEnabled); - } - GeneratedField::Encoding => { - if encoding_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("encoding")); - } - encoding_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(column_options::EncodingOpt::Encoding); - } - GeneratedField::DictionaryEnabled => { - if dictionary_enabled_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("dictionaryEnabled")); - } - dictionary_enabled_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(column_options::DictionaryEnabledOpt::DictionaryEnabled); - } - GeneratedField::Compression => { - if compression_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("compression")); - } - compression_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(column_options::CompressionOpt::Compression); - } - GeneratedField::StatisticsEnabled => { - if statistics_enabled_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("statisticsEnabled")); - } - statistics_enabled_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(column_options::StatisticsEnabledOpt::StatisticsEnabled); - } - GeneratedField::BloomFilterFpp => { - if bloom_filter_fpp_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("bloomFilterFpp")); - } - bloom_filter_fpp_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| column_options::BloomFilterFppOpt::BloomFilterFpp(x.0)); - } - GeneratedField::BloomFilterNdv => { - if bloom_filter_ndv_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("bloomFilterNdv")); - } - bloom_filter_ndv_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| column_options::BloomFilterNdvOpt::BloomFilterNdv(x.0)); - } - GeneratedField::MaxStatisticsSize => { - if max_statistics_size_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("maxStatisticsSize")); - } - max_statistics_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(x.0)); - } - } - } - Ok(ColumnOptions { - bloom_filter_enabled_opt: bloom_filter_enabled_opt__, - encoding_opt: encoding_opt__, - dictionary_enabled_opt: dictionary_enabled_opt__, - compression_opt: compression_opt__, - statistics_enabled_opt: statistics_enabled_opt__, - bloom_filter_fpp_opt: bloom_filter_fpp_opt__, - bloom_filter_ndv_opt: bloom_filter_ndv_opt__, - max_statistics_size_opt: max_statistics_size_opt__, - }) - } - } - deserializer.deserialize_struct("datafusion_common.ColumnOptions", FIELDS, GeneratedVisitor) - } -} -impl serde::Serialize for ColumnRelation { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if !self.relation.is_empty() { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion_common.ColumnRelation", len)?; - if !self.relation.is_empty() { - struct_ser.serialize_field("relation", &self.relation)?; - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for ColumnRelation { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "relation", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - Relation, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "relation" => Ok(GeneratedField::Relation), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = ColumnRelation; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion_common.ColumnRelation") - } - - fn visit_map(self, mut map_: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut relation__ = None; - while let Some(k) = map_.next_key()? { - match k { - GeneratedField::Relation => { - if relation__.is_some() { - return Err(serde::de::Error::duplicate_field("relation")); - } - relation__ = Some(map_.next_value()?); - } - } - } - Ok(ColumnRelation { - relation: relation__.unwrap_or_default(), - }) - } - } - deserializer.deserialize_struct("datafusion_common.ColumnRelation", FIELDS, GeneratedVisitor) - } -} -impl serde::Serialize for ColumnSpecificOptions { +impl serde::Serialize for ColumnRelation { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -1228,38 +888,29 @@ impl serde::Serialize for ColumnSpecificOptions { { use serde::ser::SerializeStruct; let mut len = 0; - if !self.column_name.is_empty() { - len += 1; - } - if self.options.is_some() { + if !self.relation.is_empty() { len += 1; } - let mut struct_ser = serializer.serialize_struct("datafusion_common.ColumnSpecificOptions", len)?; - if !self.column_name.is_empty() { - struct_ser.serialize_field("columnName", &self.column_name)?; - } - if let Some(v) = self.options.as_ref() { - struct_ser.serialize_field("options", v)?; + let mut struct_ser = serializer.serialize_struct("datafusion_common.ColumnRelation", len)?; + if !self.relation.is_empty() { + struct_ser.serialize_field("relation", &self.relation)?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for ColumnSpecificOptions { +impl<'de> serde::Deserialize<'de> for ColumnRelation { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "column_name", - "columnName", - "options", + "relation", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - ColumnName, - Options, + Relation, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1281,8 +932,7 @@ impl<'de> serde::Deserialize<'de> for ColumnSpecificOptions { E: serde::de::Error, { match value { - "columnName" | "column_name" => Ok(GeneratedField::ColumnName), - "options" => Ok(GeneratedField::Options), + "relation" => Ok(GeneratedField::Relation), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1292,41 +942,33 @@ impl<'de> serde::Deserialize<'de> for ColumnSpecificOptions { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = ColumnSpecificOptions; + type Value = ColumnRelation; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion_common.ColumnSpecificOptions") + formatter.write_str("struct datafusion_common.ColumnRelation") } - fn visit_map(self, mut map_: V) -> std::result::Result + fn visit_map(self, mut map_: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut column_name__ = None; - let mut options__ = None; + let mut relation__ = None; while let Some(k) = map_.next_key()? { match k { - GeneratedField::ColumnName => { - if column_name__.is_some() { - return Err(serde::de::Error::duplicate_field("columnName")); - } - column_name__ = Some(map_.next_value()?); - } - GeneratedField::Options => { - if options__.is_some() { - return Err(serde::de::Error::duplicate_field("options")); + GeneratedField::Relation => { + if relation__.is_some() { + return Err(serde::de::Error::duplicate_field("relation")); } - options__ = map_.next_value()?; + relation__ = Some(map_.next_value()?); } } } - Ok(ColumnSpecificOptions { - column_name: column_name__.unwrap_or_default(), - options: options__, + Ok(ColumnRelation { + relation: relation__.unwrap_or_default(), }) } } - deserializer.deserialize_struct("datafusion_common.ColumnSpecificOptions", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("datafusion_common.ColumnRelation", FIELDS, GeneratedVisitor) } } impl serde::Serialize for ColumnStats { @@ -4448,30 +4090,231 @@ impl serde::Serialize for List { { use serde::ser::SerializeStruct; let mut len = 0; - if self.field_type.is_some() { + if self.field_type.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.List", len)?; + if let Some(v) = self.field_type.as_ref() { + struct_ser.serialize_field("fieldType", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for List { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "field_type", + "fieldType", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + FieldType, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "fieldType" | "field_type" => Ok(GeneratedField::FieldType), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = List; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.List") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut field_type__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::FieldType => { + if field_type__.is_some() { + return Err(serde::de::Error::duplicate_field("fieldType")); + } + field_type__ = map_.next_value()?; + } + } + } + Ok(List { + field_type: field_type__, + }) + } + } + deserializer.deserialize_struct("datafusion_common.List", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for Map { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.field_type.is_some() { + len += 1; + } + if self.keys_sorted { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.Map", len)?; + if let Some(v) = self.field_type.as_ref() { + struct_ser.serialize_field("fieldType", v)?; + } + if self.keys_sorted { + struct_ser.serialize_field("keysSorted", &self.keys_sorted)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for Map { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "field_type", + "fieldType", + "keys_sorted", + "keysSorted", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + FieldType, + KeysSorted, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "fieldType" | "field_type" => Ok(GeneratedField::FieldType), + "keysSorted" | "keys_sorted" => Ok(GeneratedField::KeysSorted), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = Map; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.Map") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut field_type__ = None; + let mut keys_sorted__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::FieldType => { + if field_type__.is_some() { + return Err(serde::de::Error::duplicate_field("fieldType")); + } + field_type__ = map_.next_value()?; + } + GeneratedField::KeysSorted => { + if keys_sorted__.is_some() { + return Err(serde::de::Error::duplicate_field("keysSorted")); + } + keys_sorted__ = Some(map_.next_value()?); + } + } + } + Ok(Map { + field_type: field_type__, + keys_sorted: keys_sorted__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion_common.Map", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for NdJsonFormat { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.options.is_some() { len += 1; } - let mut struct_ser = serializer.serialize_struct("datafusion_common.List", len)?; - if let Some(v) = self.field_type.as_ref() { - struct_ser.serialize_field("fieldType", v)?; + let mut struct_ser = serializer.serialize_struct("datafusion_common.NdJsonFormat", len)?; + if let Some(v) = self.options.as_ref() { + struct_ser.serialize_field("options", v)?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for List { +impl<'de> serde::Deserialize<'de> for NdJsonFormat { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "field_type", - "fieldType", + "options", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - FieldType, + Options, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4493,7 +4336,7 @@ impl<'de> serde::Deserialize<'de> for List { E: serde::de::Error, { match value { - "fieldType" | "field_type" => Ok(GeneratedField::FieldType), + "options" => Ok(GeneratedField::Options), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4503,36 +4346,36 @@ impl<'de> serde::Deserialize<'de> for List { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = List; + type Value = NdJsonFormat; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion_common.List") + formatter.write_str("struct datafusion_common.NdJsonFormat") } - fn visit_map(self, mut map_: V) -> std::result::Result + fn visit_map(self, mut map_: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut field_type__ = None; + let mut options__ = None; while let Some(k) = map_.next_key()? { match k { - GeneratedField::FieldType => { - if field_type__.is_some() { - return Err(serde::de::Error::duplicate_field("fieldType")); + GeneratedField::Options => { + if options__.is_some() { + return Err(serde::de::Error::duplicate_field("options")); } - field_type__ = map_.next_value()?; + options__ = map_.next_value()?; } } } - Ok(List { - field_type: field_type__, + Ok(NdJsonFormat { + options: options__, }) } } - deserializer.deserialize_struct("datafusion_common.List", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("datafusion_common.NdJsonFormat", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for Map { +impl serde::Serialize for ParquetColumnOptions { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -4540,39 +4383,124 @@ impl serde::Serialize for Map { { use serde::ser::SerializeStruct; let mut len = 0; - if self.field_type.is_some() { + if self.bloom_filter_enabled_opt.is_some() { len += 1; } - if self.keys_sorted { + if self.encoding_opt.is_some() { len += 1; } - let mut struct_ser = serializer.serialize_struct("datafusion_common.Map", len)?; - if let Some(v) = self.field_type.as_ref() { - struct_ser.serialize_field("fieldType", v)?; + if self.dictionary_enabled_opt.is_some() { + len += 1; } - if self.keys_sorted { - struct_ser.serialize_field("keysSorted", &self.keys_sorted)?; + if self.compression_opt.is_some() { + len += 1; + } + if self.statistics_enabled_opt.is_some() { + len += 1; + } + if self.bloom_filter_fpp_opt.is_some() { + len += 1; + } + if self.bloom_filter_ndv_opt.is_some() { + len += 1; + } + if self.max_statistics_size_opt.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetColumnOptions", len)?; + if let Some(v) = self.bloom_filter_enabled_opt.as_ref() { + match v { + parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => { + struct_ser.serialize_field("bloomFilterEnabled", v)?; + } + } + } + if let Some(v) = self.encoding_opt.as_ref() { + match v { + parquet_column_options::EncodingOpt::Encoding(v) => { + struct_ser.serialize_field("encoding", v)?; + } + } + } + if let Some(v) = self.dictionary_enabled_opt.as_ref() { + match v { + parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v) => { + struct_ser.serialize_field("dictionaryEnabled", v)?; + } + } + } + if let Some(v) = self.compression_opt.as_ref() { + match v { + parquet_column_options::CompressionOpt::Compression(v) => { + struct_ser.serialize_field("compression", v)?; + } + } + } + if let Some(v) = self.statistics_enabled_opt.as_ref() { + match v { + parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => { + struct_ser.serialize_field("statisticsEnabled", v)?; + } + } + } + if let Some(v) = self.bloom_filter_fpp_opt.as_ref() { + match v { + parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => { + struct_ser.serialize_field("bloomFilterFpp", v)?; + } + } + } + if let Some(v) = self.bloom_filter_ndv_opt.as_ref() { + match v { + parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => { + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("bloomFilterNdv", ToString::to_string(&v).as_str())?; + } + } + } + if let Some(v) = self.max_statistics_size_opt.as_ref() { + match v { + parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => { + struct_ser.serialize_field("maxStatisticsSize", v)?; + } + } } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for Map { +impl<'de> serde::Deserialize<'de> for ParquetColumnOptions { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "field_type", - "fieldType", - "keys_sorted", - "keysSorted", + "bloom_filter_enabled", + "bloomFilterEnabled", + "encoding", + "dictionary_enabled", + "dictionaryEnabled", + "compression", + "statistics_enabled", + "statisticsEnabled", + "bloom_filter_fpp", + "bloomFilterFpp", + "bloom_filter_ndv", + "bloomFilterNdv", + "max_statistics_size", + "maxStatisticsSize", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - FieldType, - KeysSorted, + BloomFilterEnabled, + Encoding, + DictionaryEnabled, + Compression, + StatisticsEnabled, + BloomFilterFpp, + BloomFilterNdv, + MaxStatisticsSize, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4594,8 +4522,14 @@ impl<'de> serde::Deserialize<'de> for Map { E: serde::de::Error, { match value { - "fieldType" | "field_type" => Ok(GeneratedField::FieldType), - "keysSorted" | "keys_sorted" => Ok(GeneratedField::KeysSorted), + "bloomFilterEnabled" | "bloom_filter_enabled" => Ok(GeneratedField::BloomFilterEnabled), + "encoding" => Ok(GeneratedField::Encoding), + "dictionaryEnabled" | "dictionary_enabled" => Ok(GeneratedField::DictionaryEnabled), + "compression" => Ok(GeneratedField::Compression), + "statisticsEnabled" | "statistics_enabled" => Ok(GeneratedField::StatisticsEnabled), + "bloomFilterFpp" | "bloom_filter_fpp" => Ok(GeneratedField::BloomFilterFpp), + "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), + "maxStatisticsSize" | "max_statistics_size" => Ok(GeneratedField::MaxStatisticsSize), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4605,44 +4539,92 @@ impl<'de> serde::Deserialize<'de> for Map { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = Map; + type Value = ParquetColumnOptions; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion_common.Map") + formatter.write_str("struct datafusion_common.ParquetColumnOptions") } - fn visit_map(self, mut map_: V) -> std::result::Result + fn visit_map(self, mut map_: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { - let mut field_type__ = None; - let mut keys_sorted__ = None; + let mut bloom_filter_enabled_opt__ = None; + let mut encoding_opt__ = None; + let mut dictionary_enabled_opt__ = None; + let mut compression_opt__ = None; + let mut statistics_enabled_opt__ = None; + let mut bloom_filter_fpp_opt__ = None; + let mut bloom_filter_ndv_opt__ = None; + let mut max_statistics_size_opt__ = None; while let Some(k) = map_.next_key()? { match k { - GeneratedField::FieldType => { - if field_type__.is_some() { - return Err(serde::de::Error::duplicate_field("fieldType")); + GeneratedField::BloomFilterEnabled => { + if bloom_filter_enabled_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("bloomFilterEnabled")); } - field_type__ = map_.next_value()?; + bloom_filter_enabled_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled); } - GeneratedField::KeysSorted => { - if keys_sorted__.is_some() { - return Err(serde::de::Error::duplicate_field("keysSorted")); + GeneratedField::Encoding => { + if encoding_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("encoding")); } - keys_sorted__ = Some(map_.next_value()?); + encoding_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_column_options::EncodingOpt::Encoding); + } + GeneratedField::DictionaryEnabled => { + if dictionary_enabled_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("dictionaryEnabled")); + } + dictionary_enabled_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled); + } + GeneratedField::Compression => { + if compression_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("compression")); + } + compression_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_column_options::CompressionOpt::Compression); + } + GeneratedField::StatisticsEnabled => { + if statistics_enabled_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("statisticsEnabled")); + } + statistics_enabled_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled); + } + GeneratedField::BloomFilterFpp => { + if bloom_filter_fpp_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("bloomFilterFpp")); + } + bloom_filter_fpp_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(x.0)); + } + GeneratedField::BloomFilterNdv => { + if bloom_filter_ndv_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("bloomFilterNdv")); + } + bloom_filter_ndv_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(x.0)); + } + GeneratedField::MaxStatisticsSize => { + if max_statistics_size_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("maxStatisticsSize")); + } + max_statistics_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(x.0)); } } } - Ok(Map { - field_type: field_type__, - keys_sorted: keys_sorted__.unwrap_or_default(), + Ok(ParquetColumnOptions { + bloom_filter_enabled_opt: bloom_filter_enabled_opt__, + encoding_opt: encoding_opt__, + dictionary_enabled_opt: dictionary_enabled_opt__, + compression_opt: compression_opt__, + statistics_enabled_opt: statistics_enabled_opt__, + bloom_filter_fpp_opt: bloom_filter_fpp_opt__, + bloom_filter_ndv_opt: bloom_filter_ndv_opt__, + max_statistics_size_opt: max_statistics_size_opt__, }) } } - deserializer.deserialize_struct("datafusion_common.Map", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("datafusion_common.ParquetColumnOptions", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for NdJsonFormat { +impl serde::Serialize for ParquetColumnSpecificOptions { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -4650,28 +4632,37 @@ impl serde::Serialize for NdJsonFormat { { use serde::ser::SerializeStruct; let mut len = 0; + if !self.column_name.is_empty() { + len += 1; + } if self.options.is_some() { len += 1; } - let mut struct_ser = serializer.serialize_struct("datafusion_common.NdJsonFormat", len)?; + let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetColumnSpecificOptions", len)?; + if !self.column_name.is_empty() { + struct_ser.serialize_field("columnName", &self.column_name)?; + } if let Some(v) = self.options.as_ref() { struct_ser.serialize_field("options", v)?; } struct_ser.end() } } -impl<'de> serde::Deserialize<'de> for NdJsonFormat { +impl<'de> serde::Deserialize<'de> for ParquetColumnSpecificOptions { #[allow(deprecated)] fn deserialize(deserializer: D) -> std::result::Result where D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ + "column_name", + "columnName", "options", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { + ColumnName, Options, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -4694,6 +4685,7 @@ impl<'de> serde::Deserialize<'de> for NdJsonFormat { E: serde::de::Error, { match value { + "columnName" | "column_name" => Ok(GeneratedField::ColumnName), "options" => Ok(GeneratedField::Options), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } @@ -4704,19 +4696,26 @@ impl<'de> serde::Deserialize<'de> for NdJsonFormat { } struct GeneratedVisitor; impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = NdJsonFormat; + type Value = ParquetColumnSpecificOptions; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion_common.NdJsonFormat") + formatter.write_str("struct datafusion_common.ParquetColumnSpecificOptions") } - fn visit_map(self, mut map_: V) -> std::result::Result + fn visit_map(self, mut map_: V) -> std::result::Result where V: serde::de::MapAccess<'de>, { + let mut column_name__ = None; let mut options__ = None; while let Some(k) = map_.next_key()? { match k { + GeneratedField::ColumnName => { + if column_name__.is_some() { + return Err(serde::de::Error::duplicate_field("columnName")); + } + column_name__ = Some(map_.next_value()?); + } GeneratedField::Options => { if options__.is_some() { return Err(serde::de::Error::duplicate_field("options")); @@ -4725,12 +4724,13 @@ impl<'de> serde::Deserialize<'de> for NdJsonFormat { } } } - Ok(NdJsonFormat { + Ok(ParquetColumnSpecificOptions { + column_name: column_name__.unwrap_or_default(), options: options__, }) } } - deserializer.deserialize_struct("datafusion_common.NdJsonFormat", FIELDS, GeneratedVisitor) + deserializer.deserialize_struct("datafusion_common.ParquetColumnSpecificOptions", FIELDS, GeneratedVisitor) } } impl serde::Serialize for ParquetFormat { @@ -7558,6 +7558,9 @@ impl serde::Serialize for TableParquetOptions { if !self.column_specific_options.is_empty() { len += 1; } + if !self.key_value_metadata.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.TableParquetOptions", len)?; if let Some(v) = self.global.as_ref() { struct_ser.serialize_field("global", v)?; @@ -7565,6 +7568,9 @@ impl serde::Serialize for TableParquetOptions { if !self.column_specific_options.is_empty() { struct_ser.serialize_field("columnSpecificOptions", &self.column_specific_options)?; } + if !self.key_value_metadata.is_empty() { + struct_ser.serialize_field("keyValueMetadata", &self.key_value_metadata)?; + } struct_ser.end() } } @@ -7578,12 +7584,15 @@ impl<'de> serde::Deserialize<'de> for TableParquetOptions { "global", "column_specific_options", "columnSpecificOptions", + "key_value_metadata", + "keyValueMetadata", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Global, ColumnSpecificOptions, + KeyValueMetadata, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -7607,6 +7616,7 @@ impl<'de> serde::Deserialize<'de> for TableParquetOptions { match value { "global" => Ok(GeneratedField::Global), "columnSpecificOptions" | "column_specific_options" => Ok(GeneratedField::ColumnSpecificOptions), + "keyValueMetadata" | "key_value_metadata" => Ok(GeneratedField::KeyValueMetadata), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -7628,6 +7638,7 @@ impl<'de> serde::Deserialize<'de> for TableParquetOptions { { let mut global__ = None; let mut column_specific_options__ = None; + let mut key_value_metadata__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Global => { @@ -7642,11 +7653,20 @@ impl<'de> serde::Deserialize<'de> for TableParquetOptions { } column_specific_options__ = Some(map_.next_value()?); } + GeneratedField::KeyValueMetadata => { + if key_value_metadata__.is_some() { + return Err(serde::de::Error::duplicate_field("keyValueMetadata")); + } + key_value_metadata__ = Some( + map_.next_value::>()? + ); + } } } Ok(TableParquetOptions { global: global__, column_specific_options: column_specific_options__.unwrap_or_default(), + key_value_metadata: key_value_metadata__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 9bea9be89e1d..ebc05718a458 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -670,46 +670,55 @@ pub struct TableParquetOptions { #[prost(message, optional, tag = "1")] pub global: ::core::option::Option, #[prost(message, repeated, tag = "2")] - pub column_specific_options: ::prost::alloc::vec::Vec, + pub column_specific_options: ::prost::alloc::vec::Vec, + #[prost(map = "string, string", tag = "3")] + pub key_value_metadata: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ColumnSpecificOptions { +pub struct ParquetColumnSpecificOptions { #[prost(string, tag = "1")] pub column_name: ::prost::alloc::string::String, #[prost(message, optional, tag = "2")] - pub options: ::core::option::Option, + pub options: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ColumnOptions { - #[prost(oneof = "column_options::BloomFilterEnabledOpt", tags = "1")] +pub struct ParquetColumnOptions { + #[prost(oneof = "parquet_column_options::BloomFilterEnabledOpt", tags = "1")] pub bloom_filter_enabled_opt: ::core::option::Option< - column_options::BloomFilterEnabledOpt, + parquet_column_options::BloomFilterEnabledOpt, >, - #[prost(oneof = "column_options::EncodingOpt", tags = "2")] - pub encoding_opt: ::core::option::Option, - #[prost(oneof = "column_options::DictionaryEnabledOpt", tags = "3")] + #[prost(oneof = "parquet_column_options::EncodingOpt", tags = "2")] + pub encoding_opt: ::core::option::Option, + #[prost(oneof = "parquet_column_options::DictionaryEnabledOpt", tags = "3")] pub dictionary_enabled_opt: ::core::option::Option< - column_options::DictionaryEnabledOpt, + parquet_column_options::DictionaryEnabledOpt, >, - #[prost(oneof = "column_options::CompressionOpt", tags = "4")] - pub compression_opt: ::core::option::Option, - #[prost(oneof = "column_options::StatisticsEnabledOpt", tags = "5")] + #[prost(oneof = "parquet_column_options::CompressionOpt", tags = "4")] + pub compression_opt: ::core::option::Option, + #[prost(oneof = "parquet_column_options::StatisticsEnabledOpt", tags = "5")] pub statistics_enabled_opt: ::core::option::Option< - column_options::StatisticsEnabledOpt, + parquet_column_options::StatisticsEnabledOpt, + >, + #[prost(oneof = "parquet_column_options::BloomFilterFppOpt", tags = "6")] + pub bloom_filter_fpp_opt: ::core::option::Option< + parquet_column_options::BloomFilterFppOpt, + >, + #[prost(oneof = "parquet_column_options::BloomFilterNdvOpt", tags = "7")] + pub bloom_filter_ndv_opt: ::core::option::Option< + parquet_column_options::BloomFilterNdvOpt, >, - #[prost(oneof = "column_options::BloomFilterFppOpt", tags = "6")] - pub bloom_filter_fpp_opt: ::core::option::Option, - #[prost(oneof = "column_options::BloomFilterNdvOpt", tags = "7")] - pub bloom_filter_ndv_opt: ::core::option::Option, - #[prost(oneof = "column_options::MaxStatisticsSizeOpt", tags = "8")] + #[prost(oneof = "parquet_column_options::MaxStatisticsSizeOpt", tags = "8")] pub max_statistics_size_opt: ::core::option::Option< - column_options::MaxStatisticsSizeOpt, + parquet_column_options::MaxStatisticsSizeOpt, >, } -/// Nested message and enum types in `ColumnOptions`. -pub mod column_options { +/// Nested message and enum types in `ParquetColumnOptions`. +pub mod parquet_column_options { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum BloomFilterEnabledOpt { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index a61a026089fc..4cf7e73ac912 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; use crate::protobuf_common as protobuf; @@ -832,42 +833,42 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { } } -impl TryFrom<&ParquetColumnOptions> for protobuf::ColumnOptions { +impl TryFrom<&ParquetColumnOptions> for protobuf::ParquetColumnOptions { type Error = DataFusionError; fn try_from( value: &ParquetColumnOptions, ) -> datafusion_common::Result { - Ok(protobuf::ColumnOptions { + Ok(protobuf::ParquetColumnOptions { compression_opt: value .compression .clone() - .map(protobuf::column_options::CompressionOpt::Compression), + .map(protobuf::parquet_column_options::CompressionOpt::Compression), dictionary_enabled_opt: value .dictionary_enabled - .map(protobuf::column_options::DictionaryEnabledOpt::DictionaryEnabled), + .map(protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled), statistics_enabled_opt: value .statistics_enabled .clone() - .map(protobuf::column_options::StatisticsEnabledOpt::StatisticsEnabled), + .map(protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled), max_statistics_size_opt: value.max_statistics_size.map(|v| { - protobuf::column_options::MaxStatisticsSizeOpt::MaxStatisticsSize( + protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize( v as u32, ) }), encoding_opt: value .encoding .clone() - .map(protobuf::column_options::EncodingOpt::Encoding), + .map(protobuf::parquet_column_options::EncodingOpt::Encoding), bloom_filter_enabled_opt: value .bloom_filter_enabled - .map(protobuf::column_options::BloomFilterEnabledOpt::BloomFilterEnabled), + .map(protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled), bloom_filter_fpp_opt: value .bloom_filter_fpp - .map(protobuf::column_options::BloomFilterFppOpt::BloomFilterFpp), + .map(protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp), bloom_filter_ndv_opt: value .bloom_filter_ndv - .map(protobuf::column_options::BloomFilterNdvOpt::BloomFilterNdv), + .map(protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv), }) } } @@ -881,15 +882,21 @@ impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions { .column_specific_options .iter() .map(|(k, v)| { - Ok(protobuf::ColumnSpecificOptions { + Ok(protobuf::ParquetColumnSpecificOptions { column_name: k.into(), options: Some(v.try_into()?), }) }) .collect::>>()?; + let key_value_metadata = value + .key_value_metadata + .iter() + .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone()))) + .collect::>(); Ok(protobuf::TableParquetOptions { global: Some((&value.global).try_into()?), column_specific_options, + key_value_metadata, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index f48b05e8d3dc..ebc05718a458 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -671,6 +671,11 @@ pub struct TableParquetOptions { pub global: ::core::option::Option, #[prost(message, repeated, tag = "2")] pub column_specific_options: ::prost::alloc::vec::Vec, + #[prost(map = "string, string", tag = "3")] + pub key_value_metadata: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index ce9d24d94d99..607a3d8642fd 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -18,7 +18,10 @@ use std::sync::Arc; use datafusion::{ - config::{CsvOptions, JsonOptions}, + config::{ + CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, + TableParquetOptions, + }, datasource::file_format::{ arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory, parquet::ParquetFormatFactory, FileFormatFactory, @@ -31,7 +34,12 @@ use datafusion_common::{ }; use prost::Message; -use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto}; +use crate::protobuf::{ + parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto, + JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto, + ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto, + TableParquetOptions as TableParquetOptionsProto, +}; use super::LogicalExtensionCodec; @@ -337,6 +345,218 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec { } } +impl TableParquetOptionsProto { + fn from_factory(factory: &ParquetFormatFactory) -> Self { + let global_options = if let Some(ref options) = factory.options { + options.clone() + } else { + return TableParquetOptionsProto::default(); + }; + + let column_specific_options = global_options.column_specific_options; + TableParquetOptionsProto { + global: Some(ParquetOptionsProto { + enable_page_index: global_options.global.enable_page_index, + pruning: global_options.global.pruning, + skip_metadata: global_options.global.skip_metadata, + metadata_size_hint_opt: global_options.global.metadata_size_hint.map(|size| { + parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size as u64) + }), + pushdown_filters: global_options.global.pushdown_filters, + reorder_filters: global_options.global.reorder_filters, + data_pagesize_limit: global_options.global.data_pagesize_limit as u64, + write_batch_size: global_options.global.write_batch_size as u64, + writer_version: global_options.global.writer_version.clone(), + compression_opt: global_options.global.compression.map(|compression| { + parquet_options::CompressionOpt::Compression(compression) + }), + dictionary_enabled_opt: global_options.global.dictionary_enabled.map(|enabled| { + parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) + }), + dictionary_page_size_limit: global_options.global.dictionary_page_size_limit as u64, + statistics_enabled_opt: global_options.global.statistics_enabled.map(|enabled| { + parquet_options::StatisticsEnabledOpt::StatisticsEnabled(enabled) + }), + max_statistics_size_opt: global_options.global.max_statistics_size.map(|size| { + parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u64) + }), + max_row_group_size: global_options.global.max_row_group_size as u64, + created_by: global_options.global.created_by.clone(), + column_index_truncate_length_opt: global_options.global.column_index_truncate_length.map(|length| { + parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length as u64) + }), + data_page_row_count_limit: global_options.global.data_page_row_count_limit as u64, + encoding_opt: global_options.global.encoding.map(|encoding| { + parquet_options::EncodingOpt::Encoding(encoding) + }), + bloom_filter_on_read: global_options.global.bloom_filter_on_read, + bloom_filter_on_write: global_options.global.bloom_filter_on_write, + bloom_filter_fpp_opt: global_options.global.bloom_filter_fpp.map(|fpp| { + parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) + }), + bloom_filter_ndv_opt: global_options.global.bloom_filter_ndv.map(|ndv| { + parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) + }), + allow_single_file_parallelism: global_options.global.allow_single_file_parallelism, + maximum_parallel_row_group_writers: global_options.global.maximum_parallel_row_group_writers as u64, + maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64, + schema_force_string_view: global_options.global.schema_force_string_view, + }), + column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { + ParquetColumnSpecificOptions { + column_name, + options: Some(ParquetColumnOptionsProto { + bloom_filter_enabled_opt: options.bloom_filter_enabled.map(|enabled| { + parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(enabled) + }), + encoding_opt: options.encoding.map(|encoding| { + parquet_column_options::EncodingOpt::Encoding(encoding) + }), + dictionary_enabled_opt: options.dictionary_enabled.map(|enabled| { + parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) + }), + compression_opt: options.compression.map(|compression| { + parquet_column_options::CompressionOpt::Compression(compression) + }), + statistics_enabled_opt: options.statistics_enabled.map(|enabled| { + parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(enabled) + }), + bloom_filter_fpp_opt: options.bloom_filter_fpp.map(|fpp| { + parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(fpp) + }), + bloom_filter_ndv_opt: options.bloom_filter_ndv.map(|ndv| { + parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) + }), + max_statistics_size_opt: options.max_statistics_size.map(|size| { + parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size as u32) + }), + }) + } + }).collect(), + key_value_metadata: global_options.key_value_metadata + .iter() + .filter_map(|(key, value)| { + value.as_ref().map(|v| (key.clone(), v.clone())) + }) + .collect(), + } + } +} + +impl From<&ParquetOptionsProto> for ParquetOptions { + fn from(proto: &ParquetOptionsProto) -> Self { + ParquetOptions { + enable_page_index: proto.enable_page_index, + pruning: proto.pruning, + skip_metadata: proto.skip_metadata, + metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt { + parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize, + }), + pushdown_filters: proto.pushdown_filters, + reorder_filters: proto.reorder_filters, + data_pagesize_limit: proto.data_pagesize_limit as usize, + write_batch_size: proto.write_batch_size as usize, + writer_version: proto.writer_version.clone(), + compression: proto.compression_opt.as_ref().map(|opt| match opt { + parquet_options::CompressionOpt::Compression(compression) => compression.clone(), + }), + dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt { + parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled, + }), + dictionary_page_size_limit: proto.dictionary_page_size_limit as usize, + statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt { + parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(), + }), + max_statistics_size: proto.max_statistics_size_opt.as_ref().map(|opt| match opt { + parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(size) => *size as usize, + }), + max_row_group_size: proto.max_row_group_size as usize, + created_by: proto.created_by.clone(), + column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt { + parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize, + }), + data_page_row_count_limit: proto.data_page_row_count_limit as usize, + encoding: proto.encoding_opt.as_ref().map(|opt| match opt { + parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(), + }), + bloom_filter_on_read: proto.bloom_filter_on_read, + bloom_filter_on_write: proto.bloom_filter_on_write, + bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt { + parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp, + }), + bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt { + parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv, + }), + allow_single_file_parallelism: proto.allow_single_file_parallelism, + maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize, + maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize, + schema_force_string_view: proto.schema_force_string_view, + } + } +} + +impl From for ParquetColumnOptions { + fn from(proto: ParquetColumnOptionsProto) -> Self { + ParquetColumnOptions { + bloom_filter_enabled: proto.bloom_filter_enabled_opt.map( + |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v, + ), + encoding: proto + .encoding_opt + .map(|parquet_column_options::EncodingOpt::Encoding(v)| v), + dictionary_enabled: proto.dictionary_enabled_opt.map( + |parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| v, + ), + compression: proto + .compression_opt + .map(|parquet_column_options::CompressionOpt::Compression(v)| v), + statistics_enabled: proto.statistics_enabled_opt.map( + |parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v)| v, + ), + bloom_filter_fpp: proto + .bloom_filter_fpp_opt + .map(|parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v)| v), + bloom_filter_ndv: proto + .bloom_filter_ndv_opt + .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v), + max_statistics_size: proto.max_statistics_size_opt.map( + |parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v)| { + v as usize + }, + ), + } + } +} + +impl From<&TableParquetOptionsProto> for TableParquetOptions { + fn from(proto: &TableParquetOptionsProto) -> Self { + TableParquetOptions { + global: proto + .global + .as_ref() + .map(ParquetOptions::from) + .unwrap_or_default(), + column_specific_options: proto + .column_specific_options + .iter() + .map(|parquet_column_options| { + ( + parquet_column_options.column_name.clone(), + ParquetColumnOptions::from( + parquet_column_options.options.clone().unwrap_or_default(), + ), + ) + }) + .collect(), + key_value_metadata: proto + .key_value_metadata + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))) + .collect(), + } + } +} + #[derive(Debug)] pub struct ParquetLogicalExtensionCodec; @@ -382,17 +602,47 @@ impl LogicalExtensionCodec for ParquetLogicalExtensionCodec { fn try_decode_file_format( &self, - __buf: &[u8], - __ctx: &SessionContext, + buf: &[u8], + _ctx: &SessionContext, ) -> datafusion_common::Result> { - Ok(Arc::new(ParquetFormatFactory::new())) + let proto = TableParquetOptionsProto::decode(buf).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to decode TableParquetOptionsProto: {:?}", + e + )) + })?; + let options: TableParquetOptions = (&proto).into(); + Ok(Arc::new(ParquetFormatFactory { + options: Some(options), + })) } fn try_encode_file_format( &self, - __buf: &mut Vec, - __node: Arc, + buf: &mut Vec, + node: Arc, ) -> datafusion_common::Result<()> { + let options = if let Some(parquet_factory) = + node.as_any().downcast_ref::() + { + parquet_factory.options.clone().unwrap_or_default() + } else { + return Err(DataFusionError::Execution( + "Unsupported FileFormatFactory type".to_string(), + )); + }; + + let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory { + options: Some(options), + }); + + proto.encode(buf).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to encode TableParquetOptionsProto: {:?}", + e + )) + })?; + Ok(()) } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index daa92475068f..f7ad2b9b6158 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -577,6 +577,74 @@ async fn roundtrip_logical_plan_copy_to_json() -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_logical_plan_copy_to_parquet() -> Result<()> { + let ctx = SessionContext::new(); + + // Assume create_parquet_scan creates a logical plan for scanning a Parquet file + let input = create_parquet_scan(&ctx).await?; + + let table_options = + TableOptions::default_from_session_config(ctx.state().config_options()); + let mut parquet_format = table_options.parquet; + + // Set specific Parquet format options + let mut key_value_metadata = HashMap::new(); + key_value_metadata.insert("test".to_string(), Some("test".to_string())); + parquet_format.key_value_metadata = key_value_metadata.clone(); + + parquet_format.global.allow_single_file_parallelism = false; + parquet_format.global.created_by = "test".to_string(); + + let file_type = format_as_file_type(Arc::new( + ParquetFormatFactory::new_with_options(parquet_format.clone()), + )); + + let plan = LogicalPlan::Copy(CopyTo { + input: Arc::new(input), + output_url: "test.parquet".to_string(), + partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()], + file_type, + options: Default::default(), + }); + + // Assume ParquetLogicalExtensionCodec is implemented similarly to JsonLogicalExtensionCodec + let codec = ParquetLogicalExtensionCodec {}; + let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?; + let logical_round_trip = + logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?; + assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); + + match logical_round_trip { + LogicalPlan::Copy(copy_to) => { + assert_eq!("test.parquet", copy_to.output_url); + assert_eq!("parquet".to_string(), copy_to.file_type.get_ext()); + assert_eq!(vec!["a", "b", "c"], copy_to.partition_by); + + let file_type = copy_to + .file_type + .as_ref() + .as_any() + .downcast_ref::() + .unwrap(); + + let format_factory = file_type.as_format_factory(); + let parquet_factory = format_factory + .as_ref() + .as_any() + .downcast_ref::() + .unwrap(); + let parquet_config = parquet_factory.options.as_ref().unwrap(); + assert_eq!(parquet_config.key_value_metadata, key_value_metadata); + assert!(!parquet_config.global.allow_single_file_parallelism); + assert_eq!(parquet_config.global.created_by, "test".to_string()); + } + _ => panic!(), + } + + Ok(()) +} + async fn create_csv_scan(ctx: &SessionContext) -> Result { ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()) .await?; @@ -597,6 +665,20 @@ async fn create_json_scan(ctx: &SessionContext) -> Result Result { + ctx.register_parquet( + "t1", + "../substrait/tests/testdata/empty.parquet", + ParquetReadOptions::default(), + ) + .await?; + + let input = ctx.table("t1").await?.into_optimized_plan()?; + Ok(input) +} + #[tokio::test] async fn roundtrip_logical_plan_distinct_on() -> Result<()> { let ctx = SessionContext::new();