From f69d36f800cb822afa531ec604d124d60b4bdfdf Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Mon, 11 Dec 2023 18:38:48 +0100 Subject: [PATCH] feat: add kernel ExpressionEvaluator (#1829) ~~based on #1807~~ # Description In the effort to advance protocol support and move our internal APIs closer to the kernel library, it is advantageous to leverage the expression handling logic from kernel specifically for filtering actions etc. This PR just add the expression definitions and evaluation logic. Integrating it with our current codebase and basing the existing partition handling logic on this is left for follow up PRs to keep thigs review-able. related: #1894, #1776 --- Cargo.toml | 1 + crates/deltalake-core/Cargo.toml | 2 + .../src/kernel/client/expressions.rs | 320 ++++++++++++++++++ .../deltalake-core/src/kernel/client/mod.rs | 40 +++ .../src/kernel/expressions/mod.rs | 283 ++++++++++++++++ .../src/kernel/expressions/scalars.rs | 135 ++++++++ crates/deltalake-core/src/kernel/mod.rs | 4 + crates/deltalake-sql/src/parser.rs | 4 +- 8 files changed, 787 insertions(+), 2 deletions(-) create mode 100644 crates/deltalake-core/src/kernel/client/expressions.rs create mode 100644 crates/deltalake-core/src/kernel/client/mod.rs create mode 100644 crates/deltalake-core/src/kernel/expressions/mod.rs create mode 100644 crates/deltalake-core/src/kernel/expressions/scalars.rs diff --git a/Cargo.toml b/Cargo.toml index a884ff5413..1e9f311693 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ debug = "line-tables-only" [workspace.dependencies] # arrow arrow = { version = "48.0.1" } +arrow-arith = { version = "48.0.1" } arrow-array = { version = "48.0.1" } arrow-buffer = { version = "48.0.1" } arrow-cast = { version = "48.0.1" } diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index 348b597761..3c64cdda78 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -20,6 +20,7 @@ features = ["azure", "datafusion", "gcs", "glue", "hdfs", "json", "python", "s3" [dependencies] # arrow arrow = { workspace = true, optional = true } +arrow-arith = { workspace = true, optional = true } arrow-array = { workspace = true, optional = true } arrow-buffer = { workspace = true, optional = true } arrow-cast = { workspace = true, optional = true } @@ -136,6 +137,7 @@ criterion = "0.5" azure = ["object_store/azure"] arrow = [ "dep:arrow", + "arrow-arith", "arrow-array", "arrow-cast", "arrow-ord", diff --git a/crates/deltalake-core/src/kernel/client/expressions.rs b/crates/deltalake-core/src/kernel/client/expressions.rs new file mode 100644 index 0000000000..c18fb5e8de --- /dev/null +++ b/crates/deltalake-core/src/kernel/client/expressions.rs @@ -0,0 +1,320 @@ +//! Default Expression handler. +//! +//! Expression handling based on arrow-rs compute kernels. + +use std::sync::Arc; + +use arrow_arith::boolean::{and, is_null, not, or}; +use arrow_arith::numeric::{add, div, mul, sub}; +use arrow_array::{ + Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray, +}; +use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; + +use crate::kernel::error::{DeltaResult, Error}; +use crate::kernel::expressions::{scalars::Scalar, Expression}; +use crate::kernel::expressions::{BinaryOperator, UnaryOperator}; + +// TODO leverage scalars / Datum + +impl Scalar { + /// Convert scalar to arrow array. + pub fn to_array(&self, num_rows: usize) -> ArrayRef { + use Scalar::*; + match self { + Integer(val) => Arc::new(Int32Array::from(vec![*val; num_rows])), + Float(val) => Arc::new(Float32Array::from(vec![*val; num_rows])), + String(val) => Arc::new(StringArray::from(vec![val.clone(); num_rows])), + Boolean(val) => Arc::new(BooleanArray::from(vec![*val; num_rows])), + Timestamp(val) => Arc::new(TimestampMicrosecondArray::from(vec![*val; num_rows])), + Date(val) => Arc::new(Date32Array::from(vec![*val; num_rows])), + Binary(val) => Arc::new(BinaryArray::from(vec![val.as_slice(); num_rows])), + Decimal(val, precision, scale) => Arc::new( + Decimal128Array::from(vec![*val; num_rows]) + .with_precision_and_scale(*precision, *scale) + .unwrap(), + ), + Null(_) => todo!(), + } + } +} + +pub(crate) fn evaluate_expression( + expression: &Expression, + batch: &RecordBatch, +) -> DeltaResult { + match expression { + Expression::Literal(scalar) => Ok(scalar.to_array(batch.num_rows())), + Expression::Column(name) => batch + .column_by_name(name) + .ok_or(Error::MissingColumn(name.clone())) + .cloned(), + Expression::UnaryOperation { op, expr } => { + let arr = evaluate_expression(expr.as_ref(), batch)?; + match op { + UnaryOperator::Not => { + let arr = arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let result = not(arr)?; + Ok(Arc::new(result)) + } + UnaryOperator::IsNull => { + let result = is_null(&arr)?; + Ok(Arc::new(result)) + } + } + } + Expression::BinaryOperation { op, left, right } => { + let left_arr = evaluate_expression(left.as_ref(), batch)?; + let right_arr = evaluate_expression(right.as_ref(), batch)?; + match op { + BinaryOperator::Plus => { + add(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + }) + } + BinaryOperator::Minus => { + sub(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + }) + } + BinaryOperator::Multiply => { + mul(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + }) + } + BinaryOperator::Divide => { + div(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + }) + } + BinaryOperator::LessThan => { + let result = lt(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::LessThanOrEqual => { + let result = + lt_eq(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::GreaterThan => { + let result = gt(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::GreaterThanOrEqual => { + let result = + gt_eq(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::Equal => { + let result = eq(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::NotEqual => { + let result = neq(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::And => { + let left_arr = evaluate_expression(left.as_ref(), batch)?; + let left_arr = left_arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let right_arr = evaluate_expression(right.as_ref(), batch)?; + let right_arr = right_arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let result = and(left_arr, right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::Or => { + let left_arr = evaluate_expression(left.as_ref(), batch)?; + let left_arr = left_arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let right_arr = evaluate_expression(right.as_ref(), batch)?; + let right_arr = right_arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let result = or(left_arr, right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::Int32Array; + use arrow_schema::{DataType, Field, Schema}; + use std::ops::{Add, Div, Mul, Sub}; + + #[test] + fn test_binary_op_scalar() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let values = Int32Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap(); + let column = Expression::Column("a".to_string()); + + let expression = Box::new(column.clone().add(Expression::Literal(Scalar::Integer(1)))); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![2, 3, 4])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().sub(Expression::Literal(Scalar::Integer(1)))); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![0, 1, 2])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().mul(Expression::Literal(Scalar::Integer(2)))); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![2, 4, 6])); + assert_eq!(results.as_ref(), expected.as_ref()); + + // TODO handle type casting + let expression = Box::new(column.div(Expression::Literal(Scalar::Integer(1)))); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![1, 2, 3])); + assert_eq!(results.as_ref(), expected.as_ref()) + } + + #[test] + fn test_binary_op() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let values = Int32Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(values.clone()), Arc::new(values)], + ) + .unwrap(); + let column_a = Expression::Column("a".to_string()); + let column_b = Expression::Column("b".to_string()); + + let expression = Box::new(column_a.clone().add(column_b.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![2, 4, 6])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column_a.clone().sub(column_b.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![0, 0, 0])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column_a.clone().mul(column_b)); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![1, 4, 9])); + assert_eq!(results.as_ref(), expected.as_ref()); + } + + #[test] + fn test_binary_cmp() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let values = Int32Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap(); + let column = Expression::Column("a".to_string()); + let lit = Expression::Literal(Scalar::Integer(2)); + + let expression = Box::new(column.clone().lt(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, false, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().lt_eq(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, true, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().gt(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![false, false, true])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().gt_eq(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![false, true, true])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().eq(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![false, true, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().ne(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, false, true])); + assert_eq!(results.as_ref(), expected.as_ref()); + } + + #[test] + fn test_logical() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Boolean, false), + Field::new("b", DataType::Boolean, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(BooleanArray::from(vec![true, false])), + Arc::new(BooleanArray::from(vec![false, true])), + ], + ) + .unwrap(); + let column_a = Expression::Column("a".to_string()); + let column_b = Expression::Column("b".to_string()); + + let expression = Box::new(column_a.clone().and(column_b.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![false, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new( + column_a + .clone() + .and(Expression::literal(Scalar::Boolean(true))), + ); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column_a.clone().or(column_b)); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, true])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new( + column_a + .clone() + .or(Expression::literal(Scalar::Boolean(false))), + ); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + } +} diff --git a/crates/deltalake-core/src/kernel/client/mod.rs b/crates/deltalake-core/src/kernel/client/mod.rs new file mode 100644 index 0000000000..038a51d794 --- /dev/null +++ b/crates/deltalake-core/src/kernel/client/mod.rs @@ -0,0 +1,40 @@ +//! Delta kernel client implementation. +use std::sync::Arc; + +use arrow_array::RecordBatch; + +use self::expressions::evaluate_expression; +use crate::kernel::error::DeltaResult; +use crate::kernel::expressions::Expression; +use crate::kernel::schema::SchemaRef; + +pub mod expressions; + +/// Interface for implementing an Expression evaluator. +/// +/// It contains one Expression which can be evaluated on multiple ColumnarBatches. +/// Connectors can implement this interface to optimize the evaluation using the +/// connector specific capabilities. +pub trait ExpressionEvaluator { + /// Evaluate the expression on given ColumnarBatch data. + /// + /// Contains one value for each row of the input. + /// The data type of the output is same as the type output of the expression this evaluator is using. + fn evaluate(&self, batch: &RecordBatch, output_schema: SchemaRef) -> DeltaResult; +} + +#[derive(Debug)] +/// Expression evaluator based on arrow compute kernels. +pub struct ArrowExpressionEvaluator { + _input_schema: SchemaRef, + expression: Box, +} + +impl ExpressionEvaluator for ArrowExpressionEvaluator { + fn evaluate(&self, batch: &RecordBatch, output_schema: SchemaRef) -> DeltaResult { + Ok(RecordBatch::try_new( + Arc::new(output_schema.as_ref().try_into()?), + vec![evaluate_expression(&self.expression, batch)?], + )?) + } +} diff --git a/crates/deltalake-core/src/kernel/expressions/mod.rs b/crates/deltalake-core/src/kernel/expressions/mod.rs new file mode 100644 index 0000000000..80c0a72cf3 --- /dev/null +++ b/crates/deltalake-core/src/kernel/expressions/mod.rs @@ -0,0 +1,283 @@ +//! expressions. + +use std::{ + collections::HashSet, + fmt::{Display, Formatter}, +}; + +use self::scalars::Scalar; + +pub mod scalars; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// A binary operator. +pub enum BinaryOperator { + /// Logical And + And, + /// Logical Or + Or, + /// Arithmetic Plus + Plus, + /// Arithmetic Minus + Minus, + /// Arithmetic Multiply + Multiply, + /// Arithmetic Divide + Divide, + /// Comparison Less Than + LessThan, + /// Comparison Less Than Or Equal + LessThanOrEqual, + /// Comparison Greater Than + GreaterThan, + /// Comparison Greater Than Or Equal + GreaterThanOrEqual, + /// Comparison Equal + Equal, + /// Comparison Not Equal + NotEqual, +} + +impl Display for BinaryOperator { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::And => write!(f, "AND"), + Self::Or => write!(f, "OR"), + Self::Plus => write!(f, "+"), + Self::Minus => write!(f, "-"), + Self::Multiply => write!(f, "*"), + Self::Divide => write!(f, "/"), + Self::LessThan => write!(f, "<"), + Self::LessThanOrEqual => write!(f, "<="), + Self::GreaterThan => write!(f, ">"), + Self::GreaterThanOrEqual => write!(f, ">="), + Self::Equal => write!(f, "="), + Self::NotEqual => write!(f, "!="), + } + } +} + +#[derive(Debug, Clone, PartialEq)] +/// A unary operator. +pub enum UnaryOperator { + /// Unary Not + Not, + /// Unary Is Null + IsNull, +} + +/// A SQL expression. +/// +/// These expressions do not track or validate data types, other than the type +/// of literals. It is up to the expression evaluator to validate the +/// expression against a schema and add appropriate casts as required. +#[derive(Debug, Clone, PartialEq)] +pub enum Expression { + /// A literal value. + Literal(Scalar), + /// A column reference by name. + Column(String), + /// A binary operation. + BinaryOperation { + /// The operator. + op: BinaryOperator, + /// The left-hand side of the operation. + left: Box, + /// The right-hand side of the operation. + right: Box, + }, + /// A unary operation. + UnaryOperation { + /// The operator. + op: UnaryOperator, + /// The expression. + expr: Box, + }, + // TODO: support more expressions, such as IS IN, LIKE, etc. +} + +impl Display for Expression { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Literal(l) => write!(f, "{}", l), + Self::Column(name) => write!(f, "Column({})", name), + Self::BinaryOperation { op, left, right } => { + match op { + // OR requires parentheses + BinaryOperator::Or => write!(f, "({} OR {})", left, right), + _ => write!(f, "{} {} {}", left, op, right), + } + } + Self::UnaryOperation { op, expr } => match op { + UnaryOperator::Not => write!(f, "NOT {}", expr), + UnaryOperator::IsNull => write!(f, "{} IS NULL", expr), + }, + } + } +} + +impl Expression { + /// Returns a set of columns referenced by this expression. + pub fn references(&self) -> HashSet<&str> { + let mut set = HashSet::new(); + + for expr in self.walk() { + if let Self::Column(name) = expr { + set.insert(name.as_str()); + } + } + + set + } + + /// Create an new expression for a column reference + pub fn column(name: impl Into) -> Self { + Self::Column(name.into()) + } + + /// Create a new expression for a literal value + pub fn literal(value: impl Into) -> Self { + Self::Literal(value.into()) + } + + fn binary_op_impl(self, other: Self, op: BinaryOperator) -> Self { + Self::BinaryOperation { + op, + left: Box::new(self), + right: Box::new(other), + } + } + + /// Create a new expression `self == other` + pub fn eq(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::Equal) + } + + /// Create a new expression `self != other` + pub fn ne(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::NotEqual) + } + + /// Create a new expression `self < other` + pub fn lt(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::LessThan) + } + + /// Create a new expression `self > other` + pub fn gt(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::GreaterThan) + } + + /// Create a new expression `self >= other` + pub fn gt_eq(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::GreaterThanOrEqual) + } + + /// Create a new expression `self <= other` + pub fn lt_eq(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::LessThanOrEqual) + } + + /// Create a new expression `self AND other` + pub fn and(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::And) + } + + /// Create a new expression `self OR other` + pub fn or(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::Or) + } + + fn walk(&self) -> impl Iterator + '_ { + let mut stack = vec![self]; + std::iter::from_fn(move || { + let expr = stack.pop()?; + match expr { + Self::Literal(_) => {} + Self::Column { .. } => {} + Self::BinaryOperation { left, right, .. } => { + stack.push(left); + stack.push(right); + } + Self::UnaryOperation { expr, .. } => { + stack.push(expr); + } + } + Some(expr) + }) + } +} + +impl std::ops::Add for Expression { + type Output = Self; + + fn add(self, rhs: Expression) -> Self::Output { + self.binary_op_impl(rhs, BinaryOperator::Plus) + } +} + +impl std::ops::Sub for Expression { + type Output = Self; + + fn sub(self, rhs: Expression) -> Self::Output { + self.binary_op_impl(rhs, BinaryOperator::Minus) + } +} + +impl std::ops::Mul for Expression { + type Output = Self; + + fn mul(self, rhs: Expression) -> Self::Output { + self.binary_op_impl(rhs, BinaryOperator::Multiply) + } +} + +impl std::ops::Div for Expression { + type Output = Self; + + fn div(self, rhs: Expression) -> Self::Output { + self.binary_op_impl(rhs, BinaryOperator::Divide) + } +} + +#[cfg(test)] +mod tests { + use super::Expression as Expr; + + #[test] + fn test_expression_format() { + let col_ref = Expr::column("x"); + let cases = [ + (col_ref.clone(), "Column(x)"), + (col_ref.clone().eq(Expr::literal(2)), "Column(x) = 2"), + ( + col_ref + .clone() + .gt_eq(Expr::literal(2)) + .and(col_ref.clone().lt_eq(Expr::literal(10))), + "Column(x) >= 2 AND Column(x) <= 10", + ), + ( + col_ref + .clone() + .gt(Expr::literal(2)) + .or(col_ref.clone().lt(Expr::literal(10))), + "(Column(x) > 2 OR Column(x) < 10)", + ), + ( + (col_ref.clone() - Expr::literal(4)).lt(Expr::literal(10)), + "Column(x) - 4 < 10", + ), + ( + (col_ref.clone() + Expr::literal(4)) / Expr::literal(10) * Expr::literal(42), + "Column(x) + 4 / 10 * 42", + ), + (col_ref.eq(Expr::literal("foo")), "Column(x) = 'foo'"), + ]; + + for (expr, expected) in cases { + let result = format!("{}", expr); + assert_eq!(result, expected); + } + } +} diff --git a/crates/deltalake-core/src/kernel/expressions/scalars.rs b/crates/deltalake-core/src/kernel/expressions/scalars.rs new file mode 100644 index 0000000000..175470e19e --- /dev/null +++ b/crates/deltalake-core/src/kernel/expressions/scalars.rs @@ -0,0 +1,135 @@ +//! Scalar values for use in expressions. + +use std::{ + cmp::Ordering, + fmt::{Display, Formatter}, +}; + +use crate::kernel::schema::{DataType, PrimitiveType}; + +/// A single value, which can be null. Used for representing literal values +/// in [Expressions][crate::kernel::expressions::Expression]. +#[derive(Debug, Clone, PartialEq)] +pub enum Scalar { + /// A 32-bit integer. + Integer(i32), + /// A 64-bit floating point number. + Float(f32), + /// A string. + String(String), + /// A boolean. + Boolean(bool), + /// A timestamp. + Timestamp(i64), + /// A date. + Date(i32), + /// A binary value. + Binary(Vec), + /// A decimal value. + Decimal(i128, u8, i8), + /// A null value. + Null(DataType), +} + +impl Scalar { + /// Returns the [DataType] of the scalar. + pub fn data_type(&self) -> DataType { + match self { + Self::Integer(_) => DataType::Primitive(PrimitiveType::Integer), + Self::Float(_) => DataType::Primitive(PrimitiveType::Float), + Self::String(_) => DataType::Primitive(PrimitiveType::String), + Self::Boolean(_) => DataType::Primitive(PrimitiveType::Boolean), + Self::Timestamp(_) => DataType::Primitive(PrimitiveType::Timestamp), + Self::Date(_) => DataType::Primitive(PrimitiveType::Date), + Self::Binary(_) => DataType::Primitive(PrimitiveType::Binary), + Self::Decimal(_, precision, scale) => { + DataType::decimal(*precision as usize, *scale as usize) + } + Self::Null(data_type) => data_type.clone(), + } + } +} + +impl Display for Scalar { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Integer(i) => write!(f, "{}", i), + Self::Float(fl) => write!(f, "{}", fl), + Self::String(s) => write!(f, "'{}'", s), + Self::Boolean(b) => write!(f, "{}", b), + Self::Timestamp(ts) => write!(f, "{}", ts), + Self::Date(d) => write!(f, "{}", d), + Self::Binary(b) => write!(f, "{:?}", b), + Self::Decimal(value, _, scale) => match scale.cmp(&0) { + Ordering::Equal => { + write!(f, "{}", value) + } + Ordering::Greater => { + let scalar_multiple = 10_i128.pow(*scale as u32); + write!(f, "{}", value / scalar_multiple)?; + write!(f, ".")?; + write!( + f, + "{:0>scale$}", + value % scalar_multiple, + scale = *scale as usize + ) + } + Ordering::Less => { + write!(f, "{}", value)?; + for _ in 0..(scale.abs()) { + write!(f, "0")?; + } + Ok(()) + } + }, + Self::Null(_) => write!(f, "null"), + } + } +} + +impl From for Scalar { + fn from(i: i32) -> Self { + Self::Integer(i) + } +} + +impl From for Scalar { + fn from(b: bool) -> Self { + Self::Boolean(b) + } +} + +impl From<&str> for Scalar { + fn from(s: &str) -> Self { + Self::String(s.into()) + } +} + +impl From for Scalar { + fn from(value: String) -> Self { + Self::String(value) + } +} + +// TODO: add more From impls + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_decimal_display() { + let s = Scalar::Decimal(123456789, 9, 2); + assert_eq!(s.to_string(), "1234567.89"); + + let s = Scalar::Decimal(123456789, 9, 0); + assert_eq!(s.to_string(), "123456789"); + + let s = Scalar::Decimal(123456789, 9, 9); + assert_eq!(s.to_string(), "0.123456789"); + + let s = Scalar::Decimal(123, 9, -3); + assert_eq!(s.to_string(), "123000"); + } +} diff --git a/crates/deltalake-core/src/kernel/mod.rs b/crates/deltalake-core/src/kernel/mod.rs index 54f742c3fb..c8d01c138d 100644 --- a/crates/deltalake-core/src/kernel/mod.rs +++ b/crates/deltalake-core/src/kernel/mod.rs @@ -3,9 +3,13 @@ pub mod actions; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod arrow; +#[cfg(feature = "arrow")] +pub mod client; pub mod error; +pub mod expressions; pub mod schema; pub use actions::*; pub use error::*; +pub use expressions::*; pub use schema::*; diff --git a/crates/deltalake-sql/src/parser.rs b/crates/deltalake-sql/src/parser.rs index c76cced9bd..3287c87215 100644 --- a/crates/deltalake-sql/src/parser.rs +++ b/crates/deltalake-sql/src/parser.rs @@ -63,10 +63,10 @@ impl fmt::Display for Statement { } } -/// Delta Lake SQL Parser based on [`sqlparser`] +/// Delta Lake SQL Parser based on [`sqlparser`](https://crates.io/crates/sqlparser) /// /// This parser handles Delta Lake specific statements, delegating to -/// [`DFParser`](datafusion_sql::parser::DFParser) for other SQL statements. +/// [`DFParser`]for other SQL statements. pub struct DeltaParser<'a> { sql: &'a str, parser: Parser<'a>,