Skip to content

Commit

Permalink
Implement physical plan serialization for parquet Copy plans (#11735)
Browse files Browse the repository at this point in the history
* Implement physical plan serialization for parquet Copy plans

* fix clippy
  • Loading branch information
Lordworms authored Jul 31, 2024
1 parent 2887491 commit fa50636
Show file tree
Hide file tree
Showing 9 changed files with 867 additions and 492 deletions.
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ const BUFFER_FLUSH_BYTES: usize = 1024000;
#[derive(Default)]
/// Factory struct used to create [ParquetFormat]
pub struct ParquetFormatFactory {
options: Option<TableParquetOptions>,
/// inner options for parquet
pub options: Option<TableParquetOptions>,
}

impl ParquetFormatFactory {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ message JsonOptions {
message TableParquetOptions {
ParquetOptions global = 1;
repeated ParquetColumnSpecificOptions column_specific_options = 2;
map<string, string> key_value_metadata = 3;
}

message ParquetColumnSpecificOptions {
Expand Down
22 changes: 11 additions & 11 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,48 +961,48 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
}
}

impl TryFrom<&protobuf::ColumnOptions> for ParquetColumnOptions {
impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
type Error = DataFusionError;
fn try_from(
value: &protobuf::ColumnOptions,
value: &protobuf::ParquetColumnOptions,
) -> datafusion_common::Result<Self, Self::Error> {
Ok(ParquetColumnOptions {
compression: value.compression_opt.clone().map(|opt| match opt {
protobuf::column_options::CompressionOpt::Compression(v) => Some(v),
protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
}).unwrap_or(None),
dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
statistics_enabled: value
.statistics_enabled_opt.clone()
.map(|opt| match opt {
protobuf::column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
})
.unwrap_or(None),
max_statistics_size: value
.max_statistics_size_opt.clone()
.map(|opt| match opt {
protobuf::column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => Some(v as usize),
protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => Some(v as usize),
})
.unwrap_or(None),
encoding: value
.encoding_opt.clone()
.map(|opt| match opt {
protobuf::column_options::EncodingOpt::Encoding(v) => Some(v),
protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
})
.unwrap_or(None),
bloom_filter_enabled: value.bloom_filter_enabled_opt.clone().map(|opt| match opt {
protobuf::column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
})
.unwrap_or(None),
bloom_filter_fpp: value
.bloom_filter_fpp_opt.clone()
.map(|opt| match opt {
protobuf::column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
})
.unwrap_or(None),
bloom_filter_ndv: value
.bloom_filter_ndv_opt.clone()
.map(|opt| match opt {
protobuf::column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
})
.unwrap_or(None),
})
Expand All @@ -1016,7 +1016,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
) -> datafusion_common::Result<Self, Self::Error> {
let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
HashMap::new();
for protobuf::ColumnSpecificOptions {
for protobuf::ParquetColumnSpecificOptions {
column_name,
options: maybe_options,
} in &value.column_specific_options
Expand Down
Loading

0 comments on commit fa50636

Please sign in to comment.