Skip to content

Commit

Permalink
chore: cleanup expression handler
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Nov 9, 2023
1 parent 6fde545 commit fab4a26
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 1,069 deletions.
22 changes: 4 additions & 18 deletions crates/deltalake-core/src/kernel/client/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@ use std::sync::Arc;

use arrow_arith::boolean::{and, is_null, not, or};
use arrow_arith::numeric::{add, div, mul, sub};
use arrow_array::RecordBatch as ColumnarBatch;
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};

use super::ExpressionEvaluator;
use crate::kernel::error::{DeltaResult, Error};
use crate::kernel::expressions::{scalars::Scalar, Expression};
use crate::kernel::expressions::{BinaryOperator, UnaryOperator};
use crate::kernel::schema::SchemaRef;

// TODO leverage scalars / Datum

Expand All @@ -43,7 +40,10 @@ impl Scalar {
}
}

fn evaluate_expression(expression: &Expression, batch: &RecordBatch) -> DeltaResult<ArrayRef> {
pub(crate) fn evaluate_expression(
expression: &Expression,
batch: &RecordBatch,
) -> DeltaResult<ArrayRef> {
match expression {
Expression::Literal(scalar) => Ok(scalar.to_array(batch.num_rows())),
Expression::Column(name) => batch
Expand Down Expand Up @@ -166,20 +166,6 @@ fn evaluate_expression(expression: &Expression, batch: &RecordBatch) -> DeltaRes
}
}

#[derive(Debug)]
/// Expression evaluator based on arrow compute kernels.
pub struct ArrowExpressionEvaluator {
_input_schema: SchemaRef,
expression: Box<Expression>,
}

impl ExpressionEvaluator for ArrowExpressionEvaluator {
fn evaluate(&self, batch: &ColumnarBatch) -> DeltaResult<ColumnarBatch> {
let _result = evaluate_expression(&self.expression, batch)?;
todo!()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
24 changes: 22 additions & 2 deletions crates/deltalake-core/src/kernel/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
//! Delta kernel client implementation.
use std::sync::Arc;

use arrow_array::RecordBatch;

use super::error::DeltaResult;
use self::expressions::evaluate_expression;
use crate::kernel::error::DeltaResult;
use crate::kernel::expressions::Expression;
use crate::kernel::schema::SchemaRef;

pub mod expressions;

Expand All @@ -16,5 +20,21 @@ pub trait ExpressionEvaluator {
///
/// Contains one value for each row of the input.
/// The data type of the output is same as the type output of the expression this evaluator is using.
fn evaluate(&self, batch: &RecordBatch) -> DeltaResult<RecordBatch>;
fn evaluate(&self, batch: &RecordBatch, output_schema: SchemaRef) -> DeltaResult<RecordBatch>;
}

#[derive(Debug)]
/// Expression evaluator based on arrow compute kernels.
pub struct ArrowExpressionEvaluator {
_input_schema: SchemaRef,
expression: Box<Expression>,
}

impl ExpressionEvaluator for ArrowExpressionEvaluator {
fn evaluate(&self, batch: &RecordBatch, output_schema: SchemaRef) -> DeltaResult<RecordBatch> {
Ok(RecordBatch::try_new(
Arc::new(output_schema.as_ref().try_into()?),
vec![evaluate_expression(&self.expression, batch)?],
)?)
}
}
1 change: 1 addition & 0 deletions crates/deltalake-core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Kernel module
pub mod actions;
#[cfg(feature = "arrow")]
pub mod client;
pub mod error;
pub mod expressions;
Expand Down
Loading

0 comments on commit fab4a26

Please sign in to comment.