Skip to content

Commit

Permalink
chore: bump kernel + python version
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Dec 1, 2024
1 parent ba671c9 commit 8e6a8b7
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.4.1", features = ["sync-engine"] }
delta_kernel = { version = "0.5.0", features = ["sync-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
Expand Down
1 change: 1 addition & 0 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl std::fmt::Debug for DynamoDbLockClient {

impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
#[allow(clippy::too_many_arguments)]
pub fn try_new(
sdk_config: &SdkConfig,
lock_table_name: Option<String>,
Expand Down
27 changes: 15 additions & 12 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ mod datafusion {
use std::collections::HashSet;
use std::sync::Arc;

use super::*;
use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column};
use crate::kernel::ARROW_HANDLER;
use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use ::datafusion::physical_optimizer::pruning::PruningStatistics;
use ::datafusion::physical_plan::Accumulator;
Expand All @@ -499,12 +502,9 @@ mod datafusion {
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::expressions::Expression;
use delta_kernel::schema::{DataType, PrimitiveType};
use delta_kernel::AsAny;
use delta_kernel::{ExpressionEvaluator, ExpressionHandler};

use super::*;
use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column};
use crate::kernel::ARROW_HANDLER;

#[derive(Debug, Default, Clone)]
enum AccumulatorType {
Min,
Expand Down Expand Up @@ -725,9 +725,12 @@ mod datafusion {
return None;
}
let expression = if self.metadata.partition_columns.contains(&column.name) {
Expression::Column(format!("add.partitionValues_parsed.{}", column.name))
Expression::column(vec![format!("add.partitionValues_parsed.{}", column.name)])
} else {
Expression::Column(format!("add.stats_parsed.{}.{}", stats_field, column.name))
Expression::column(vec![format!(
"add.stats_parsed.{}.{}",
stats_field, column.name
)])
};
let evaluator = ARROW_HANDLER.get_evaluator(
crate::kernel::models::fields::log_schema_ref().clone(),
Expand All @@ -737,9 +740,9 @@ mod datafusion {
let mut results = Vec::with_capacity(self.data.len());
for batch in self.data.iter() {
let engine = ArrowEngineData::new(batch.clone());
let result = evaluator.evaluate(&engine).ok()?;
let result = Arc::new(evaluator.evaluate(&engine).ok()?);
let result = result
.as_any()
.any_ref()
.downcast_ref::<ArrowEngineData>()
.ok_or(DeltaTableError::generic(
"failed to downcast evaluator result to ArrowEngineData.",
Expand All @@ -748,7 +751,7 @@ mod datafusion {
results.push(result.record_batch().clone());
}
let batch = concat_batches(results[0].schema_ref(), &results).ok()?;
batch.column_by_name("output").map(|c| c.clone())
batch.column_by_name("output").cloned()
}
}

Expand Down Expand Up @@ -803,16 +806,16 @@ mod datafusion {
lazy_static::lazy_static! {
static ref ROW_COUNTS_EVAL: Arc<dyn ExpressionEvaluator> = ARROW_HANDLER.get_evaluator(
crate::kernel::models::fields::log_schema_ref().clone(),
Expression::column("add.stats_parsed.numRecords"),
Expression::column(vec!["add.stats_parsed.numRecords"].into_iter()),
DataType::Primitive(PrimitiveType::Long),
);
}
let mut results = Vec::with_capacity(self.data.len());
for batch in self.data.iter() {
let engine = ArrowEngineData::new(batch.clone());
let result = ROW_COUNTS_EVAL.evaluate(&engine).ok()?;
let result = Arc::new(ROW_COUNTS_EVAL.evaluate(&engine).ok()?);
let result = result
.as_any()
.any_ref()
.downcast_ref::<ArrowEngineData>()
.ok_or(DeltaTableError::generic(
"failed to downcast evaluator result to ArrowEngineData.",
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ impl EagerSnapshot {

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.snapshot.load_config()
self.snapshot.load_config()
}

/// Well known table configuration
Expand Down Expand Up @@ -696,7 +696,7 @@ fn stats_schema(schema: &StructType, config: TableConfig<'_>) -> DeltaResult<Str

pub(crate) fn partitions_schema(
schema: &StructType,
partition_columns: &Vec<String>,
partition_columns: &[String],
) -> DeltaResult<Option<StructType>> {
if partition_columns.is_empty() {
return Ok(None);
Expand All @@ -705,7 +705,7 @@ pub(crate) fn partitions_schema(
partition_columns
.iter()
.map(|col| {
schema.field(col).map(|field| field.clone()).ok_or_else(|| {
schema.field(col).cloned().ok_or_else(|| {
DeltaTableError::Generic(format!(
"Partition column {} not found in schema",
col
Expand Down
6 changes: 2 additions & 4 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<'a, S> ReplayStream<'a, S> {
visitors: &'a mut Vec<Box<dyn ReplayVisitor>>,
) -> DeltaResult<Self> {
let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?);
let partitions_schema = snapshot.partitions_schema(None)?.map(|s| Arc::new(s));
let partitions_schema = snapshot.partitions_schema(None)?.map(Arc::new);
let mapper = Arc::new(LogMapper {
stats_schema,
partitions_schema,
Expand Down Expand Up @@ -83,9 +83,7 @@ impl LogMapper {
) -> DeltaResult<Self> {
Ok(Self {
stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?),
partitions_schema: snapshot
.partitions_schema(table_schema)?
.map(|s| Arc::new(s)),
partitions_schema: snapshot.partitions_schema(table_schema)?.map(Arc::new),
config: snapshot.config.clone(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async fn execute(
// [here](https://github.com/delta-io/delta-rs/pull/2886#issuecomment-2481550560>
let rules: Vec<Arc<dyn datafusion::optimizer::OptimizerRule + Send + Sync>> = state
.optimizers()
.into_iter()
.iter()
.filter(|rule| {
rule.name() != "optimize_projections" && rule.name() != "simplify_expressions"
})
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::time::Duration;
use std::{collections::HashMap, str::FromStr};

use delta_kernel::features::ColumnMappingMode;
use delta_kernel::table_features::ColumnMappingMode;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use arrow_array::{
use arrow_cast::cast;
use arrow_cast::parse::Parser;
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use delta_kernel::features::ColumnMappingMode;
use delta_kernel::table_features::ColumnMappingMode;
use itertools::Itertools;

use super::state::DeltaTableState;
Expand Down
14 changes: 6 additions & 8 deletions crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,19 @@ fn stats_from_metadata(

let idx_to_iterate = if let Some(stats_cols) = stats_columns {
let stats_cols = stats_cols
.into_iter()
.iter()
.map(|v| {
match sqlparser::parser::Parser::new(&dialect)
.try_with_sql(v.as_ref())
.map_err(|e| DeltaTableError::generic(e.to_string()))?
.parse_multipart_identifier()
{
Ok(parts) => Ok(parts.into_iter().map(|v| v.value).join(".")),
Err(e) => {
return Err(DeltaWriterError::DeltaTable(
DeltaTableError::GenericError {
source: Box::new(e),
},
))
}
Err(e) => Err(DeltaWriterError::DeltaTable(
DeltaTableError::GenericError {
source: Box::new(e),
},
)),
}
})
.collect::<Result<Vec<String>, DeltaWriterError>>()?;
Expand Down
10 changes: 2 additions & 8 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@ use delta_kernel::expressions::Scalar;
use delta_kernel::schema::StructField;
use deltalake::arrow::compute::concat_batches;
use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use deltalake::arrow::pyarrow::ToPyArrow;
use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator};
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::checkpoints::{cleanup_metadata, create_checkpoint};
use deltalake::datafusion::datasource::provider_as_source;
use deltalake::datafusion::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use deltalake::datafusion::physical_plan::ExecutionPlan;
use deltalake::datafusion::prelude::{DataFrame, SessionContext};
use deltalake::delta_datafusion::{
DataFusionMixins, DeltaDataChecker, DeltaScanConfigBuilder, DeltaSessionConfig,
DeltaTableProvider,
};
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{
scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction,
Expand Down

0 comments on commit 8e6a8b7

Please sign in to comment.