Skip to content

Commit

Permalink
feat: stats and partition value handling
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 15, 2024
1 parent 5415073 commit f072a00
Show file tree
Hide file tree
Showing 40 changed files with 1,251 additions and 1,017 deletions.
2 changes: 1 addition & 1 deletion crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async fn benchmark_merge_tpcds(
merge: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> {
let table = DeltaTableBuilder::from_uri(path).load().await?;
let file_count = table.snapshot()?.files()?.len();
let file_count = table.snapshot()?.file_actions()?.len();

let provider = DeltaTableProvider::try_new(
table.snapshot()?.clone(),
Expand Down
10 changes: 5 additions & 5 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl<'a> DeltaScanBuilder<'a> {
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
self.snapshot
.files()?
.file_actions()?
.iter()
.zip(files_to_prune.into_iter())
.filter_map(
Expand All @@ -348,7 +348,7 @@ impl<'a> DeltaScanBuilder<'a> {
)
.collect()
} else {
self.snapshot.files()?
self.snapshot.file_actions()?
}
}
};
Expand Down Expand Up @@ -1186,7 +1186,7 @@ pub(crate) async fn find_files_scan<'a>(
expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
.files()?
.file_actions()?
.iter()
.map(|add| (add.path.clone(), add.to_owned()))
.collect();
Expand Down Expand Up @@ -1246,7 +1246,7 @@ pub(crate) async fn scan_memory_table(
snapshot: &DeltaTableState,
predicate: &Expr,
) -> DeltaResult<Vec<Add>> {
let actions = snapshot.files()?;
let actions = snapshot.file_actions()?;

let batch = snapshot.add_actions_table(true)?;
let mut arrays = Vec::new();
Expand Down Expand Up @@ -1334,7 +1334,7 @@ pub async fn find_files<'a>(
}
}
None => Ok(FindFiles {
candidates: snapshot.files()?,
candidates: snapshot.file_actions()?,
partition_scan: true,
}),
}
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl TryFrom<&ArrowDataType> for DataType {
ArrowDataType::UInt64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type
ArrowDataType::UInt32 => Ok(DataType::Primitive(PrimitiveType::Integer)),
ArrowDataType::UInt16 => Ok(DataType::Primitive(PrimitiveType::Short)),
ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Boolean)),
ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Byte)),
ArrowDataType::Float32 => Ok(DataType::Primitive(PrimitiveType::Float)),
ArrowDataType::Float64 => Ok(DataType::Primitive(PrimitiveType::Double)),
ArrowDataType::Boolean => Ok(DataType::Primitive(PrimitiveType::Boolean)),
Expand Down
17 changes: 16 additions & 1 deletion crates/deltalake-core/src/kernel/client/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use arrow_array::{
StructArray, TimestampMicrosecondArray,
};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, Schema as ArrowSchema};
use arrow_schema::{ArrowError, Field as ArrowField, Schema as ArrowSchema};
use arrow_select::nullif::nullif;

use crate::kernel::error::{DeltaResult, Error};
Expand Down Expand Up @@ -78,6 +78,21 @@ impl Scalar {
DataType::Map { .. } => unimplemented!(),
DataType::Struct { .. } => unimplemented!(),
},
Struct(values, fields) => {
let mut columns = Vec::with_capacity(values.len());
for val in values {
columns.push(val.to_array(num_rows)?);
}
Arc::new(StructArray::try_new(
fields
.iter()
.map(TryInto::<ArrowField>::try_into)
.collect::<Result<Vec<_>, _>>()?
.into(),
columns,
None,
)?)
}
};
Ok(arr)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/kernel/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::fmt::{Display, Formatter};

use itertools::Itertools;

use self::scalars::Scalar;
pub use self::scalars::*;

pub mod scalars;

Expand Down
Loading

0 comments on commit f072a00

Please sign in to comment.