Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Writing of Parquet SortingColumn #19263

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 159 additions & 14 deletions crates/polars-io/src/parquet/write/batched_writer.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
use std::io::Write;
use std::sync::Mutex;

use arrow::array::Array;
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::POOL;
use polars_parquet::read::ParquetError;
use polars_parquet::write::{
array_to_columns, CompressedPage, Compressor, DynIter, DynStreamingIterator, Encoding,
FallibleStreamingIterator, FileWriter, Page, ParquetType, RowGroupIterColumns,
SchemaDescriptor, WriteOptions,
RowGroupWriteOptions, SchemaDescriptor, SortingColumn, WriteOptions,
};
use rayon::prelude::*;

use super::options::{MaterializedSortingColumns, MetadataOptions, SortingColumnBehavior};

pub struct BatchedWriter<W: Write> {
// A mutex so that streaming engine can get concurrent read access to
// compress pages.
pub(super) writer: Mutex<FileWriter<W>>,
pub(super) parquet_schema: SchemaDescriptor,
pub(super) encodings: Vec<Vec<Encoding>>,
pub(super) options: WriteOptions,
pub(super) metadata_options: MetadataOptions,
pub(super) parallel: bool,
}

Expand Down Expand Up @@ -53,12 +58,14 @@ impl<W: Write> BatchedWriter<W> {
&self.parquet_schema,
&self.encodings,
self.options,
&self.metadata_options,
self.parallel,
);
// Lock before looping so that order is maintained under contention.
let mut writer = self.writer.lock().unwrap();
for group in row_group_iter {
writer.write(group?)?;
for item in row_group_iter {
let (group, rg_options) = item?;
writer.write(group, rg_options)?;
}
Ok(())
}
Expand All @@ -67,14 +74,29 @@ impl<W: Write> BatchedWriter<W> {
&self.writer
}

pub fn write_row_groups(
pub fn write_row_groups_default_options(
&self,
rgs: Vec<RowGroupIterColumns<'static, PolarsError>>,
) -> PolarsResult<()> {
// Lock before looping so that order is maintained.
let mut writer = self.writer.lock().unwrap();
for group in rgs {
writer.write(group)?;
writer.write(group, RowGroupWriteOptions::default())?;
}
Ok(())
}

pub fn write_row_groups(
&self,
rgs: Vec<(
RowGroupIterColumns<'static, PolarsError>,
RowGroupWriteOptions,
)>,
) -> PolarsResult<()> {
// Lock before looping so that order is maintained.
let mut writer = self.writer.lock().unwrap();
for (group, rg_options) in rgs {
writer.write(group, rg_options)?;
}
Ok(())
}
Expand All @@ -93,17 +115,40 @@ fn prepare_rg_iter<'a>(
parquet_schema: &'a SchemaDescriptor,
encodings: &'a [Vec<Encoding>],
options: WriteOptions,
md_options: &'a MetadataOptions,
parallel: bool,
) -> impl Iterator<Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>> + 'a {
) -> impl Iterator<
Item = PolarsResult<(
RowGroupIterColumns<'static, PolarsError>,
RowGroupWriteOptions,
)>,
> + 'a {
// @TODO: This does not work for nested columns.
let sortedness = df
.get_columns()
.iter()
.map(|c| c.is_sorted_flag())
.collect::<Vec<_>>();
// @TODO: This does not work for nested columns.
let dtypes = df
.get_columns()
.iter()
.map(|c| c.dtype())
.collect::<Vec<_>>();

let rb_iter = df.iter_chunks(CompatLevel::newest(), false);
rb_iter.filter_map(move |batch| match batch.len() {
0 => None,
_ => {
let row_group =
create_serializer(batch, parquet_schema.fields(), encodings, options, parallel);

Some(row_group)
},
_ => Some(create_serializer(
batch,
parquet_schema.fields(),
encodings,
&sortedness,
&dtypes,
options,
md_options,
parallel,
)),
})
}

Expand Down Expand Up @@ -147,9 +192,15 @@ fn create_serializer(
batch: RecordBatch,
fields: &[ParquetType],
encodings: &[Vec<Encoding>],
sortedness: &[IsSorted],
dtypes: &[&DataType],
options: WriteOptions,
md_options: &MetadataOptions,
parallel: bool,
) -> PolarsResult<RowGroupIterColumns<'static, PolarsError>> {
) -> PolarsResult<(
RowGroupIterColumns<'static, PolarsError>,
RowGroupWriteOptions,
)> {
let func = move |((array, type_), encoding): ((&ArrayRef, &ParquetType), &Vec<Encoding>)| {
array_to_pages_iter(array, type_, encoding, options)
};
Expand All @@ -176,7 +227,101 @@ fn create_serializer(

let row_group = DynIter::new(columns.into_iter());

Ok(row_group)
let mut rg_options = RowGroupWriteOptions::default();

match &md_options.sorting_columns {
MaterializedSortingColumns::All(behavior) => {
// @TODO: properly handle nested columns.
rg_options.sorting_columns = (0..batch.columns().len())
.filter_map(|leaf_idx| {
to_sorting_column(&batch, dtypes, sortedness, leaf_idx, *behavior)
})
.collect();
},
MaterializedSortingColumns::PerLeaf(sorting_columns) => {
rg_options.sorting_columns = sorting_columns
.iter()
.filter_map(|(leaf_idx, behavior)| {
to_sorting_column(&batch, dtypes, sortedness, *leaf_idx as usize, *behavior)
})
.collect();
},
}

Ok((row_group, rg_options))
}

fn has_compatible_sortedness(dtype: &DataType, _array: &dyn Array) -> bool {
use DataType as DT;

matches!(
dtype,
DT::UInt8
| DT::UInt16
| DT::UInt32
| DT::UInt64
| DT::Int8
| DT::Int16
| DT::Int32
| DT::Int64
)
}

fn to_sorting_column(
batch: &RecordBatch,
dtypes: &[&DataType],
sortedness: &[IsSorted],
leaf_idx: usize,
behavior: SortingColumnBehavior,
) -> Option<SortingColumn> {
use SortingColumnBehavior as B;

// @TODO: This does not work for nested structures.
let col_idx = leaf_idx;
let array = &batch.columns()[col_idx as usize];
let dtype = dtypes[leaf_idx as usize];

if matches!(
behavior,
B::Preserve { force: false } | B::Evaluate { force: false }
) {
if !has_compatible_sortedness(dtype, array.as_ref()) {
return None;
}
}

match (behavior, sortedness[leaf_idx as usize]) {
(B::NoPreserve, _) => None,
(
B::Force {
descending,
nulls_first,
},
_,
) => Some(SortingColumn {
column_idx: leaf_idx as i32,
descending,
nulls_first,
}),
(B::Preserve { .. }, IsSorted::Not) => None,
(B::Preserve { .. } | B::Evaluate { .. }, IsSorted::Ascending) => {
let nulls_first = !array.is_empty() && unsafe { array.get_unchecked(0) }.is_null();
Some(SortingColumn {
column_idx: leaf_idx as i32,
descending: false,
nulls_first,
})
},
(B::Preserve { .. } | B::Evaluate { .. }, IsSorted::Descending) => {
let nulls_first = !array.is_empty() && unsafe { array.get_unchecked(0) }.is_null();
Some(SortingColumn {
column_idx: leaf_idx as i32,
descending: true,
nulls_first,
})
},
(B::Evaluate { .. }, IsSorted::Not) => todo!(),
}
}

/// This serializer encodes and compresses all eagerly in memory.
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ mod options;
mod writer;

pub use batched_writer::BatchedWriter;
pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel};
pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel, SortingColumnBehavior};
pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions};
pub use writer::ParquetWriter;
pub use writer::{SortingColumns, ParquetWriter};
33 changes: 33 additions & 0 deletions crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,39 @@ use polars_parquet::write::{
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Copy, PartialEq, Eq)]
pub enum SortingColumnBehavior {
/// Never set the `SortingColumn` metadata.
NoPreserve,

/// Preserve the known sortedness information into the `SortingColumn` field.
Preserve { force: bool },

/// Evaluate whether a column is sorted and store found information in the `SortingColumn`
/// field.
Evaluate { force: bool },

/// Force the column to be of a certain `SortingColumn` value.
Force { descending: bool, nulls_first: bool },
}

impl Default for SortingColumnBehavior {
fn default() -> Self {
Self::Preserve { force: false }
}
}

#[derive(Clone, PartialEq, Eq)]
pub enum MaterializedSortingColumns {
All(SortingColumnBehavior),
PerLeaf(Vec<(i32, SortingColumnBehavior)>),
}

#[derive(Clone, PartialEq, Eq)]
pub(crate) struct MetadataOptions {
pub sorting_columns: MaterializedSortingColumns,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetWriteOptions {
Expand Down
Loading
Loading