Skip to content

Commit

Permalink
refactor: simplify module structure
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 12, 2024
1 parent ae7d162 commit 2836a1e
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 460 deletions.
114 changes: 2 additions & 112 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug};
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit};
Expand All @@ -48,7 +47,7 @@ use datafusion::execution::context::{SessionConfig, SessionContext, SessionState
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::{
Expand Down Expand Up @@ -77,7 +76,7 @@ use tracing::error;
use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataCheck, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::kernel::{Add, DataCheck, Invariant};
use crate::logstore::LogStoreRef;
use crate::protocol::{ColumnCountStat, ColumnValueStat};
use crate::table::builder::ensure_table_uri;
Expand Down Expand Up @@ -255,115 +254,6 @@ impl DeltaTableState {
}
}

// TODO: Collapse with operations/transaction/state.rs method of same name
fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option<ArrayRef> {
let field = table
.get_schema()
.ok()
.map(|s| s.field_with_name(&column.name).ok())??;

// See issue 1214. Binary type does not support natural order which is required for Datafusion to prune
if let DeltaDataType::Primitive(PrimitiveType::Binary) = &field.data_type() {
return None;
}

let data_type = field.data_type().try_into().ok()?;
let partition_columns = &table.metadata().ok()?.partition_columns;
let files = table.snapshot().ok()?.files().ok()?;
let values = files.iter().map(|add| {
if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
let value = match value {
Some(v) => serde_json::Value::String(v.to_string()),
None => serde_json::Value::Null,
};
to_correct_scalar_value(&value, &data_type)
.ok()
.flatten()
.unwrap_or(
get_null_of_arrow_type(&data_type).expect("Could not determine null type"),
)
} else if let Ok(Some(statistics)) = add.get_stats() {
let values = if get_max {
statistics.max_values
} else {
statistics.min_values
};

values
.get(&column.name)
.and_then(|f| {
to_correct_scalar_value(f.as_value()?, &data_type)
.ok()
.flatten()
})
.unwrap_or(
get_null_of_arrow_type(&data_type).expect("Could not determine null type"),
)
} else {
// No statistics available
get_null_of_arrow_type(&data_type).expect("Could not determine null type")
}
});
ScalarValue::iter_to_array(values).ok()
}

// TODO only implement this for Snapshot, not for DeltaTable
impl PruningStatistics for DeltaTable {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_prune_stats(self, column, false)
}

/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
get_prune_stats(self, column, true)
}

/// return the number of containers (e.g. row groups) being
/// pruned with these statistics
fn num_containers(&self) -> usize {
self.get_state().unwrap().files().unwrap().len()
}

/// return the number of null values for the named column as an
/// `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.metadata().ok()?.partition_columns;
let files = self.snapshot().ok()?.files().ok()?;
let values = files.iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
match value {
Some(_) => ScalarValue::UInt64(Some(0)),
None => ScalarValue::UInt64(Some(statistics.num_records as u64)),
}
} else {
statistics
.null_count
.get(&column.name)
.map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64)))
.unwrap_or(ScalarValue::UInt64(None))
}
} else if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
match value {
Some(_) => ScalarValue::UInt64(Some(0)),
None => ScalarValue::UInt64(None),
}
} else {
ScalarValue::UInt64(None)
}
});
ScalarValue::iter_to_array(values).ok()
}
}

// each delta table must register a specific object store, since paths are internally
// handled relative to the table root.
pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
Expand Down
10 changes: 5 additions & 5 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,21 @@ pub struct Metadata {

impl Metadata {
/// Create a new metadata action
pub fn new(
pub fn try_new(
schema: StructType,
partition_columns: impl IntoIterator<Item = impl Into<String>>,
configuration: HashMap<String, Option<String>>,
) -> Self {
Self {
) -> DeltaResult<Self> {
Ok(Self {
id: uuid::Uuid::new_v4().to_string(),
format: Default::default(),
schema_string: serde_json::to_string(&schema).unwrap(),
schema_string: serde_json::to_string(&schema)?,
partition_columns: partition_columns.into_iter().map(|c| c.into()).collect(),
configuration,
name: None,
description: None,
created_time: None,
}
})
}

/// set the table name in the metadata action
Expand Down
50 changes: 20 additions & 30 deletions crates/deltalake-core/src/kernel/snapshot/extract.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Utilties to extract columns from a record batch or nested / complex arrays.
use std::sync::Arc;

use arrow_array::{
Expand All @@ -8,6 +10,7 @@ use arrow_schema::{ArrowError, DataType};

use crate::{DeltaResult, DeltaTableError};

/// Trait to extract a column by name from a record batch or nested / complex array.
pub(crate) trait ProvidesColumnByName {
fn column_by_name(&self, name: &str) -> Option<&Arc<dyn Array>>;
}
Expand All @@ -24,6 +27,9 @@ impl ProvidesColumnByName for StructArray {
}
}

/// Extracts a column by name and casts it to the given type array type `T`.
///
/// Returns an error if the column does not exist or if the column is not of type `T`.
pub(super) fn extract_and_cast<'a, T: Array + 'static>(
arr: &'a dyn ProvidesColumnByName,
name: &'a str,
Expand All @@ -34,6 +40,9 @@ pub(super) fn extract_and_cast<'a, T: Array + 'static>(
)))
}

/// Extracts a column by name and casts it to the given type array type `T`.
///
/// Returns `None` if the column does not exist or if the column is not of type `T`.
pub(super) fn extract_and_cast_opt<'a, T: Array + 'static>(
array: &'a dyn ProvidesColumnByName,
name: &'a str,
Expand Down Expand Up @@ -63,7 +72,7 @@ pub(super) fn extract_column<'a>(
DataType::Map(_, _) => {
// NOTE a map has exatly one child, but we wnat to be agnostic of its name.
// so we case the current array as map, and use the entries accessor.
let maparr = column_as_map(path_step, &Some(child))?;
let maparr = cast_column_as::<MapArray>(path_step, &Some(child))?;
if let Some(next_path) = remaining_path_steps.next() {
extract_column(maparr.entries(), next_path, remaining_path_steps)
} else {
Expand All @@ -83,10 +92,10 @@ pub(super) fn extract_column<'a>(
}
}
DataType::List(_) => {
let listarr = column_as_list(path_step, &Some(child))?;
let listarr = cast_column_as::<ListArray>(path_step, &Some(child))?;
if let Some(next_path) = remaining_path_steps.next() {
extract_column(
column_as_struct(next_path_step, &Some(listarr.values()))?,
cast_column_as::<StructArray>(next_path_step, &Some(listarr.values()))?,
next_path,
remaining_path_steps,
)
Expand All @@ -95,7 +104,7 @@ pub(super) fn extract_column<'a>(
}
}
_ => extract_column(
column_as_struct(path_step, &Some(child))?,
cast_column_as::<StructArray>(path_step, &Some(child))?,
next_path_step,
remaining_path_steps,
),
Expand All @@ -105,37 +114,18 @@ pub(super) fn extract_column<'a>(
}
}

fn column_as_struct<'a>(
fn cast_column_as<'a, T: Array + 'static>(
name: &str,
column: &Option<&'a Arc<dyn Array>>,
) -> Result<&'a StructArray, ArrowError> {
) -> Result<&'a T, ArrowError> {
column
.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))?
.as_any()
.downcast_ref::<StructArray>()
.ok_or(ArrowError::SchemaError(format!("{} is not a struct", name)))
}

fn column_as_map<'a>(
name: &str,
column: &Option<&'a Arc<dyn Array>>,
) -> Result<&'a MapArray, ArrowError> {
column
.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))?
.as_any()
.downcast_ref::<MapArray>()
.ok_or(ArrowError::SchemaError(format!("{} is not a map", name)))
}

fn column_as_list<'a>(
name: &str,
column: &Option<&'a Arc<dyn Array>>,
) -> Result<&'a ListArray, ArrowError> {
column
.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))?
.as_any()
.downcast_ref::<ListArray>()
.ok_or(ArrowError::SchemaError(format!("{} is not a map", name)))
.downcast_ref::<T>()
.ok_or(ArrowError::SchemaError(format!(
"{} is not of esxpected type.",
name
)))
}

#[inline]
Expand Down
Loading

0 comments on commit 2836a1e

Please sign in to comment.