Skip to content

Commit

Permalink
feat: Support Array statistics in parquet (#15031)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 13, 2024
1 parent 1d7b49e commit 37d14e7
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 63 deletions.
9 changes: 8 additions & 1 deletion crates/polars-parquet/src/arrow/read/statistics/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ pub struct DynMutableListArray {
impl DynMutableListArray {
pub fn try_with_capacity(data_type: ArrowDataType, capacity: usize) -> PolarsResult<Self> {
let inner = match data_type.to_logical_type() {
ArrowDataType::List(inner) | ArrowDataType::LargeList(inner) => inner.data_type(),
ArrowDataType::List(inner)
| ArrowDataType::LargeList(inner)
| ArrowDataType::FixedSizeList(inner, _) => inner.data_type(),
_ => unreachable!(),
};
let inner = make_mutable(inner, capacity)?;
Expand Down Expand Up @@ -60,6 +62,11 @@ impl MutableArray for DynMutableListArray {
None,
))
},
ArrowDataType::FixedSizeList(field, _) => Box::new(FixedSizeListArray::new(
ArrowDataType::FixedSizeList(field.clone(), inner.len()),
inner,
None,
)),
_ => unreachable!(),
}
}
Expand Down
106 changes: 44 additions & 62 deletions crates/polars-parquet/src/arrow/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,90 +62,76 @@ struct MutableStatistics {

impl From<MutableStatistics> for Statistics {
fn from(mut s: MutableStatistics) -> Self {
let null_count = if let PhysicalType::Struct = s.null_count.data_type().to_physical_type() {
s.null_count
let null_count = match s.null_count.data_type().to_physical_type() {
PhysicalType::Struct => s
.null_count
.as_box()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::Map = s.null_count.data_type().to_physical_type() {
s.null_count
.as_box()
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::List = s.null_count.data_type().to_physical_type() {
s.null_count
.boxed(),
PhysicalType::List => s
.null_count
.as_box()
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::LargeList = s.null_count.data_type().to_physical_type() {
s.null_count
.boxed(),
PhysicalType::LargeList => s
.null_count
.as_box()
.as_any()
.downcast_ref::<ListArray<i64>>()
.unwrap()
.clone()
.boxed()
} else {
s.null_count
.boxed(),
_ => s
.null_count
.as_box()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.clone()
.boxed()
.boxed(),
};
let distinct_count = if let PhysicalType::Struct =
s.distinct_count.data_type().to_physical_type()
{
s.distinct_count

let distinct_count = match s.distinct_count.data_type().to_physical_type() {
PhysicalType::Struct => s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::Map = s.distinct_count.data_type().to_physical_type() {
s.distinct_count
.as_box()
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::List = s.distinct_count.data_type().to_physical_type() {
s.distinct_count
.boxed(),
PhysicalType::List => s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::LargeList = s.distinct_count.data_type().to_physical_type() {
s.distinct_count
.boxed(),
PhysicalType::LargeList => s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<ListArray<i64>>()
.unwrap()
.clone()
.boxed()
} else {
s.distinct_count
.boxed(),
_ => s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.clone()
.boxed()
.boxed(),
};

Self {
null_count,
distinct_count,
Expand Down Expand Up @@ -180,9 +166,10 @@ fn make_mutable(data_type: &ArrowDataType, capacity: usize) -> PolarsResult<Box<
Box::new(MutableFixedSizeBinaryArray::try_new(data_type.clone(), vec![], None).unwrap())
as _
},
PhysicalType::LargeList | PhysicalType::List => Box::new(
PhysicalType::LargeList | PhysicalType::List | PhysicalType::FixedSizeList => Box::new(
DynMutableListArray::try_with_capacity(data_type.clone(), capacity)?,
) as Box<dyn MutableArray>,
)
as Box<dyn MutableArray>,
PhysicalType::Dictionary(_) => Box::new(
dictionary::DynMutableDictionary::try_with_capacity(data_type.clone(), capacity)?,
),
Expand Down Expand Up @@ -212,32 +199,27 @@ fn make_mutable(data_type: &ArrowDataType, capacity: usize) -> PolarsResult<Box<
}

fn create_dt(data_type: &ArrowDataType) -> ArrowDataType {
if let ArrowDataType::Struct(fields) = data_type.to_logical_type() {
ArrowDataType::Struct(
match data_type.to_logical_type() {
ArrowDataType::Struct(fields) => ArrowDataType::Struct(
fields
.iter()
.map(|f| Field::new(&f.name, create_dt(&f.data_type), f.is_nullable))
.collect(),
)
} else if let ArrowDataType::Map(f, ordered) = data_type.to_logical_type() {
ArrowDataType::Map(
),
ArrowDataType::Map(f, ordered) => ArrowDataType::Map(
Box::new(Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)),
*ordered,
)
} else if let ArrowDataType::List(f) = data_type.to_logical_type() {
ArrowDataType::List(Box::new(Field::new(
&f.name,
create_dt(&f.data_type),
f.is_nullable,
)))
} else if let ArrowDataType::LargeList(f) = data_type.to_logical_type() {
ArrowDataType::LargeList(Box::new(Field::new(
),
ArrowDataType::LargeList(f) => ArrowDataType::LargeList(Box::new(Field::new(
&f.name,
create_dt(&f.data_type),
f.is_nullable,
)))
} else {
ArrowDataType::UInt64
))),
// FixedSizeList piggy backs on list
ArrowDataType::List(f) | ArrowDataType::FixedSizeList(f, _) => ArrowDataType::List(
Box::new(Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)),
),
_ => ArrowDataType::UInt64,
}
}

Expand Down Expand Up @@ -330,7 +312,7 @@ fn push(
null_count: &mut dyn MutableArray,
) -> PolarsResult<()> {
match min.data_type().to_logical_type() {
List(_) | LargeList(_) => {
List(_) | LargeList(_) | FixedSizeList(_, _) => {
let min = min
.as_mut_any()
.downcast_mut::<list::DynMutableListArray>()
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,15 @@ def test_parquet_array_dtype() -> None:
df = pl.DataFrame({"x": [[1, 2, 3]]})
df = df.cast({"x": pl.Array(pl.Int64, width=3)})
test_round_trip(df)


@pytest.mark.write_disk()
def test_parquet_array_statistics() -> None:
df = pl.DataFrame({"a": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], "b": [1, 2, 3]})
df.with_columns(a=pl.col("a").list.to_array(3)).lazy().filter(
pl.col("a") != [1, 2, 3]
).collect()
df.with_columns(a=pl.col("a").list.to_array(3)).lazy().sink_parquet("test.parquet")
assert pl.scan_parquet("test.parquet").filter(
pl.col("a") != [1, 2, 3]
).collect().to_dict(as_series=False) == {"a": [[4, 5, 6], [7, 8, 9]], "b": [2, 3]}

0 comments on commit 37d14e7

Please sign in to comment.