diff --git a/Cargo.toml b/Cargo.toml index a884ff5413..1e9f311693 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ debug = "line-tables-only" [workspace.dependencies] # arrow arrow = { version = "48.0.1" } +arrow-arith = { version = "48.0.1" } arrow-array = { version = "48.0.1" } arrow-buffer = { version = "48.0.1" } arrow-cast = { version = "48.0.1" } diff --git a/crates/deltalake-core/Cargo.toml b/crates/deltalake-core/Cargo.toml index 348b597761..3c64cdda78 100644 --- a/crates/deltalake-core/Cargo.toml +++ b/crates/deltalake-core/Cargo.toml @@ -20,6 +20,7 @@ features = ["azure", "datafusion", "gcs", "glue", "hdfs", "json", "python", "s3" [dependencies] # arrow arrow = { workspace = true, optional = true } +arrow-arith = { workspace = true, optional = true } arrow-array = { workspace = true, optional = true } arrow-buffer = { workspace = true, optional = true } arrow-cast = { workspace = true, optional = true } @@ -136,6 +137,7 @@ criterion = "0.5" azure = ["object_store/azure"] arrow = [ "dep:arrow", + "arrow-arith", "arrow-array", "arrow-cast", "arrow-ord", diff --git a/crates/deltalake-core/src/kernel/client/expressions.rs b/crates/deltalake-core/src/kernel/client/expressions.rs new file mode 100644 index 0000000000..c18fb5e8de --- /dev/null +++ b/crates/deltalake-core/src/kernel/client/expressions.rs @@ -0,0 +1,320 @@ +//! Default Expression handler. +//! +//! Expression handling based on arrow-rs compute kernels. + +use std::sync::Arc; + +use arrow_arith::boolean::{and, is_null, not, or}; +use arrow_arith::numeric::{add, div, mul, sub}; +use arrow_array::{ + Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray, +}; +use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; + +use crate::kernel::error::{DeltaResult, Error}; +use crate::kernel::expressions::{scalars::Scalar, Expression}; +use crate::kernel::expressions::{BinaryOperator, UnaryOperator}; + +// TODO leverage scalars / Datum + +impl Scalar { + /// Convert scalar to arrow array. + pub fn to_array(&self, num_rows: usize) -> ArrayRef { + use Scalar::*; + match self { + Integer(val) => Arc::new(Int32Array::from(vec![*val; num_rows])), + Float(val) => Arc::new(Float32Array::from(vec![*val; num_rows])), + String(val) => Arc::new(StringArray::from(vec![val.clone(); num_rows])), + Boolean(val) => Arc::new(BooleanArray::from(vec![*val; num_rows])), + Timestamp(val) => Arc::new(TimestampMicrosecondArray::from(vec![*val; num_rows])), + Date(val) => Arc::new(Date32Array::from(vec![*val; num_rows])), + Binary(val) => Arc::new(BinaryArray::from(vec![val.as_slice(); num_rows])), + Decimal(val, precision, scale) => Arc::new( + Decimal128Array::from(vec![*val; num_rows]) + .with_precision_and_scale(*precision, *scale) + .unwrap(), + ), + Null(_) => todo!(), + } + } +} + +pub(crate) fn evaluate_expression( + expression: &Expression, + batch: &RecordBatch, +) -> DeltaResult { + match expression { + Expression::Literal(scalar) => Ok(scalar.to_array(batch.num_rows())), + Expression::Column(name) => batch + .column_by_name(name) + .ok_or(Error::MissingColumn(name.clone())) + .cloned(), + Expression::UnaryOperation { op, expr } => { + let arr = evaluate_expression(expr.as_ref(), batch)?; + match op { + UnaryOperator::Not => { + let arr = arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let result = not(arr)?; + Ok(Arc::new(result)) + } + UnaryOperator::IsNull => { + let result = is_null(&arr)?; + Ok(Arc::new(result)) + } + } + } + Expression::BinaryOperation { op, left, right } => { + let left_arr = evaluate_expression(left.as_ref(), batch)?; + let right_arr = evaluate_expression(right.as_ref(), batch)?; + match op { + BinaryOperator::Plus => { + add(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + }) + } + BinaryOperator::Minus => { + sub(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + }) + } + BinaryOperator::Multiply => { + mul(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + }) + } + BinaryOperator::Divide => { + div(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + }) + } + BinaryOperator::LessThan => { + let result = lt(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::LessThanOrEqual => { + let result = + lt_eq(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::GreaterThan => { + let result = gt(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::GreaterThanOrEqual => { + let result = + gt_eq(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::Equal => { + let result = eq(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::NotEqual => { + let result = neq(&left_arr, &right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::And => { + let left_arr = evaluate_expression(left.as_ref(), batch)?; + let left_arr = left_arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let right_arr = evaluate_expression(right.as_ref(), batch)?; + let right_arr = right_arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let result = and(left_arr, right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + BinaryOperator::Or => { + let left_arr = evaluate_expression(left.as_ref(), batch)?; + let left_arr = left_arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let right_arr = evaluate_expression(right.as_ref(), batch)?; + let right_arr = right_arr + .as_any() + .downcast_ref::() + .ok_or(Error::Generic("expected boolean array".to_string()))?; + let result = or(left_arr, right_arr).map_err(|err| Error::GenericError { + source: Box::new(err), + })?; + Ok(Arc::new(result)) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::Int32Array; + use arrow_schema::{DataType, Field, Schema}; + use std::ops::{Add, Div, Mul, Sub}; + + #[test] + fn test_binary_op_scalar() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let values = Int32Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap(); + let column = Expression::Column("a".to_string()); + + let expression = Box::new(column.clone().add(Expression::Literal(Scalar::Integer(1)))); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![2, 3, 4])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().sub(Expression::Literal(Scalar::Integer(1)))); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![0, 1, 2])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().mul(Expression::Literal(Scalar::Integer(2)))); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![2, 4, 6])); + assert_eq!(results.as_ref(), expected.as_ref()); + + // TODO handle type casting + let expression = Box::new(column.div(Expression::Literal(Scalar::Integer(1)))); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![1, 2, 3])); + assert_eq!(results.as_ref(), expected.as_ref()) + } + + #[test] + fn test_binary_op() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let values = Int32Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(values.clone()), Arc::new(values)], + ) + .unwrap(); + let column_a = Expression::Column("a".to_string()); + let column_b = Expression::Column("b".to_string()); + + let expression = Box::new(column_a.clone().add(column_b.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![2, 4, 6])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column_a.clone().sub(column_b.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![0, 0, 0])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column_a.clone().mul(column_b)); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(Int32Array::from(vec![1, 4, 9])); + assert_eq!(results.as_ref(), expected.as_ref()); + } + + #[test] + fn test_binary_cmp() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let values = Int32Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap(); + let column = Expression::Column("a".to_string()); + let lit = Expression::Literal(Scalar::Integer(2)); + + let expression = Box::new(column.clone().lt(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, false, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().lt_eq(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, true, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().gt(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![false, false, true])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().gt_eq(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![false, true, true])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().eq(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![false, true, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column.clone().ne(lit.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, false, true])); + assert_eq!(results.as_ref(), expected.as_ref()); + } + + #[test] + fn test_logical() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Boolean, false), + Field::new("b", DataType::Boolean, false), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(BooleanArray::from(vec![true, false])), + Arc::new(BooleanArray::from(vec![false, true])), + ], + ) + .unwrap(); + let column_a = Expression::Column("a".to_string()); + let column_b = Expression::Column("b".to_string()); + + let expression = Box::new(column_a.clone().and(column_b.clone())); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![false, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new( + column_a + .clone() + .and(Expression::literal(Scalar::Boolean(true))), + ); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new(column_a.clone().or(column_b)); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, true])); + assert_eq!(results.as_ref(), expected.as_ref()); + + let expression = Box::new( + column_a + .clone() + .or(Expression::literal(Scalar::Boolean(false))), + ); + let results = evaluate_expression(&expression, &batch).unwrap(); + let expected = Arc::new(BooleanArray::from(vec![true, false])); + assert_eq!(results.as_ref(), expected.as_ref()); + } +} diff --git a/crates/deltalake-core/src/kernel/client/mod.rs b/crates/deltalake-core/src/kernel/client/mod.rs new file mode 100644 index 0000000000..038a51d794 --- /dev/null +++ b/crates/deltalake-core/src/kernel/client/mod.rs @@ -0,0 +1,40 @@ +//! Delta kernel client implementation. +use std::sync::Arc; + +use arrow_array::RecordBatch; + +use self::expressions::evaluate_expression; +use crate::kernel::error::DeltaResult; +use crate::kernel::expressions::Expression; +use crate::kernel::schema::SchemaRef; + +pub mod expressions; + +/// Interface for implementing an Expression evaluator. +/// +/// It contains one Expression which can be evaluated on multiple ColumnarBatches. +/// Connectors can implement this interface to optimize the evaluation using the +/// connector specific capabilities. +pub trait ExpressionEvaluator { + /// Evaluate the expression on given ColumnarBatch data. + /// + /// Contains one value for each row of the input. + /// The data type of the output is same as the type output of the expression this evaluator is using. + fn evaluate(&self, batch: &RecordBatch, output_schema: SchemaRef) -> DeltaResult; +} + +#[derive(Debug)] +/// Expression evaluator based on arrow compute kernels. +pub struct ArrowExpressionEvaluator { + _input_schema: SchemaRef, + expression: Box, +} + +impl ExpressionEvaluator for ArrowExpressionEvaluator { + fn evaluate(&self, batch: &RecordBatch, output_schema: SchemaRef) -> DeltaResult { + Ok(RecordBatch::try_new( + Arc::new(output_schema.as_ref().try_into()?), + vec![evaluate_expression(&self.expression, batch)?], + )?) + } +} diff --git a/crates/deltalake-core/src/kernel/expressions/mod.rs b/crates/deltalake-core/src/kernel/expressions/mod.rs new file mode 100644 index 0000000000..80c0a72cf3 --- /dev/null +++ b/crates/deltalake-core/src/kernel/expressions/mod.rs @@ -0,0 +1,283 @@ +//! expressions. + +use std::{ + collections::HashSet, + fmt::{Display, Formatter}, +}; + +use self::scalars::Scalar; + +pub mod scalars; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// A binary operator. +pub enum BinaryOperator { + /// Logical And + And, + /// Logical Or + Or, + /// Arithmetic Plus + Plus, + /// Arithmetic Minus + Minus, + /// Arithmetic Multiply + Multiply, + /// Arithmetic Divide + Divide, + /// Comparison Less Than + LessThan, + /// Comparison Less Than Or Equal + LessThanOrEqual, + /// Comparison Greater Than + GreaterThan, + /// Comparison Greater Than Or Equal + GreaterThanOrEqual, + /// Comparison Equal + Equal, + /// Comparison Not Equal + NotEqual, +} + +impl Display for BinaryOperator { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::And => write!(f, "AND"), + Self::Or => write!(f, "OR"), + Self::Plus => write!(f, "+"), + Self::Minus => write!(f, "-"), + Self::Multiply => write!(f, "*"), + Self::Divide => write!(f, "/"), + Self::LessThan => write!(f, "<"), + Self::LessThanOrEqual => write!(f, "<="), + Self::GreaterThan => write!(f, ">"), + Self::GreaterThanOrEqual => write!(f, ">="), + Self::Equal => write!(f, "="), + Self::NotEqual => write!(f, "!="), + } + } +} + +#[derive(Debug, Clone, PartialEq)] +/// A unary operator. +pub enum UnaryOperator { + /// Unary Not + Not, + /// Unary Is Null + IsNull, +} + +/// A SQL expression. +/// +/// These expressions do not track or validate data types, other than the type +/// of literals. It is up to the expression evaluator to validate the +/// expression against a schema and add appropriate casts as required. +#[derive(Debug, Clone, PartialEq)] +pub enum Expression { + /// A literal value. + Literal(Scalar), + /// A column reference by name. + Column(String), + /// A binary operation. + BinaryOperation { + /// The operator. + op: BinaryOperator, + /// The left-hand side of the operation. + left: Box, + /// The right-hand side of the operation. + right: Box, + }, + /// A unary operation. + UnaryOperation { + /// The operator. + op: UnaryOperator, + /// The expression. + expr: Box, + }, + // TODO: support more expressions, such as IS IN, LIKE, etc. +} + +impl Display for Expression { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Literal(l) => write!(f, "{}", l), + Self::Column(name) => write!(f, "Column({})", name), + Self::BinaryOperation { op, left, right } => { + match op { + // OR requires parentheses + BinaryOperator::Or => write!(f, "({} OR {})", left, right), + _ => write!(f, "{} {} {}", left, op, right), + } + } + Self::UnaryOperation { op, expr } => match op { + UnaryOperator::Not => write!(f, "NOT {}", expr), + UnaryOperator::IsNull => write!(f, "{} IS NULL", expr), + }, + } + } +} + +impl Expression { + /// Returns a set of columns referenced by this expression. + pub fn references(&self) -> HashSet<&str> { + let mut set = HashSet::new(); + + for expr in self.walk() { + if let Self::Column(name) = expr { + set.insert(name.as_str()); + } + } + + set + } + + /// Create an new expression for a column reference + pub fn column(name: impl Into) -> Self { + Self::Column(name.into()) + } + + /// Create a new expression for a literal value + pub fn literal(value: impl Into) -> Self { + Self::Literal(value.into()) + } + + fn binary_op_impl(self, other: Self, op: BinaryOperator) -> Self { + Self::BinaryOperation { + op, + left: Box::new(self), + right: Box::new(other), + } + } + + /// Create a new expression `self == other` + pub fn eq(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::Equal) + } + + /// Create a new expression `self != other` + pub fn ne(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::NotEqual) + } + + /// Create a new expression `self < other` + pub fn lt(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::LessThan) + } + + /// Create a new expression `self > other` + pub fn gt(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::GreaterThan) + } + + /// Create a new expression `self >= other` + pub fn gt_eq(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::GreaterThanOrEqual) + } + + /// Create a new expression `self <= other` + pub fn lt_eq(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::LessThanOrEqual) + } + + /// Create a new expression `self AND other` + pub fn and(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::And) + } + + /// Create a new expression `self OR other` + pub fn or(self, other: Self) -> Self { + self.binary_op_impl(other, BinaryOperator::Or) + } + + fn walk(&self) -> impl Iterator + '_ { + let mut stack = vec![self]; + std::iter::from_fn(move || { + let expr = stack.pop()?; + match expr { + Self::Literal(_) => {} + Self::Column { .. } => {} + Self::BinaryOperation { left, right, .. } => { + stack.push(left); + stack.push(right); + } + Self::UnaryOperation { expr, .. } => { + stack.push(expr); + } + } + Some(expr) + }) + } +} + +impl std::ops::Add for Expression { + type Output = Self; + + fn add(self, rhs: Expression) -> Self::Output { + self.binary_op_impl(rhs, BinaryOperator::Plus) + } +} + +impl std::ops::Sub for Expression { + type Output = Self; + + fn sub(self, rhs: Expression) -> Self::Output { + self.binary_op_impl(rhs, BinaryOperator::Minus) + } +} + +impl std::ops::Mul for Expression { + type Output = Self; + + fn mul(self, rhs: Expression) -> Self::Output { + self.binary_op_impl(rhs, BinaryOperator::Multiply) + } +} + +impl std::ops::Div for Expression { + type Output = Self; + + fn div(self, rhs: Expression) -> Self::Output { + self.binary_op_impl(rhs, BinaryOperator::Divide) + } +} + +#[cfg(test)] +mod tests { + use super::Expression as Expr; + + #[test] + fn test_expression_format() { + let col_ref = Expr::column("x"); + let cases = [ + (col_ref.clone(), "Column(x)"), + (col_ref.clone().eq(Expr::literal(2)), "Column(x) = 2"), + ( + col_ref + .clone() + .gt_eq(Expr::literal(2)) + .and(col_ref.clone().lt_eq(Expr::literal(10))), + "Column(x) >= 2 AND Column(x) <= 10", + ), + ( + col_ref + .clone() + .gt(Expr::literal(2)) + .or(col_ref.clone().lt(Expr::literal(10))), + "(Column(x) > 2 OR Column(x) < 10)", + ), + ( + (col_ref.clone() - Expr::literal(4)).lt(Expr::literal(10)), + "Column(x) - 4 < 10", + ), + ( + (col_ref.clone() + Expr::literal(4)) / Expr::literal(10) * Expr::literal(42), + "Column(x) + 4 / 10 * 42", + ), + (col_ref.eq(Expr::literal("foo")), "Column(x) = 'foo'"), + ]; + + for (expr, expected) in cases { + let result = format!("{}", expr); + assert_eq!(result, expected); + } + } +} diff --git a/crates/deltalake-core/src/kernel/expressions/scalars.rs b/crates/deltalake-core/src/kernel/expressions/scalars.rs new file mode 100644 index 0000000000..175470e19e --- /dev/null +++ b/crates/deltalake-core/src/kernel/expressions/scalars.rs @@ -0,0 +1,135 @@ +//! Scalar values for use in expressions. + +use std::{ + cmp::Ordering, + fmt::{Display, Formatter}, +}; + +use crate::kernel::schema::{DataType, PrimitiveType}; + +/// A single value, which can be null. Used for representing literal values +/// in [Expressions][crate::kernel::expressions::Expression]. +#[derive(Debug, Clone, PartialEq)] +pub enum Scalar { + /// A 32-bit integer. + Integer(i32), + /// A 64-bit floating point number. + Float(f32), + /// A string. + String(String), + /// A boolean. + Boolean(bool), + /// A timestamp. + Timestamp(i64), + /// A date. + Date(i32), + /// A binary value. + Binary(Vec), + /// A decimal value. + Decimal(i128, u8, i8), + /// A null value. + Null(DataType), +} + +impl Scalar { + /// Returns the [DataType] of the scalar. + pub fn data_type(&self) -> DataType { + match self { + Self::Integer(_) => DataType::Primitive(PrimitiveType::Integer), + Self::Float(_) => DataType::Primitive(PrimitiveType::Float), + Self::String(_) => DataType::Primitive(PrimitiveType::String), + Self::Boolean(_) => DataType::Primitive(PrimitiveType::Boolean), + Self::Timestamp(_) => DataType::Primitive(PrimitiveType::Timestamp), + Self::Date(_) => DataType::Primitive(PrimitiveType::Date), + Self::Binary(_) => DataType::Primitive(PrimitiveType::Binary), + Self::Decimal(_, precision, scale) => { + DataType::decimal(*precision as usize, *scale as usize) + } + Self::Null(data_type) => data_type.clone(), + } + } +} + +impl Display for Scalar { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Integer(i) => write!(f, "{}", i), + Self::Float(fl) => write!(f, "{}", fl), + Self::String(s) => write!(f, "'{}'", s), + Self::Boolean(b) => write!(f, "{}", b), + Self::Timestamp(ts) => write!(f, "{}", ts), + Self::Date(d) => write!(f, "{}", d), + Self::Binary(b) => write!(f, "{:?}", b), + Self::Decimal(value, _, scale) => match scale.cmp(&0) { + Ordering::Equal => { + write!(f, "{}", value) + } + Ordering::Greater => { + let scalar_multiple = 10_i128.pow(*scale as u32); + write!(f, "{}", value / scalar_multiple)?; + write!(f, ".")?; + write!( + f, + "{:0>scale$}", + value % scalar_multiple, + scale = *scale as usize + ) + } + Ordering::Less => { + write!(f, "{}", value)?; + for _ in 0..(scale.abs()) { + write!(f, "0")?; + } + Ok(()) + } + }, + Self::Null(_) => write!(f, "null"), + } + } +} + +impl From for Scalar { + fn from(i: i32) -> Self { + Self::Integer(i) + } +} + +impl From for Scalar { + fn from(b: bool) -> Self { + Self::Boolean(b) + } +} + +impl From<&str> for Scalar { + fn from(s: &str) -> Self { + Self::String(s.into()) + } +} + +impl From for Scalar { + fn from(value: String) -> Self { + Self::String(value) + } +} + +// TODO: add more From impls + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_decimal_display() { + let s = Scalar::Decimal(123456789, 9, 2); + assert_eq!(s.to_string(), "1234567.89"); + + let s = Scalar::Decimal(123456789, 9, 0); + assert_eq!(s.to_string(), "123456789"); + + let s = Scalar::Decimal(123456789, 9, 9); + assert_eq!(s.to_string(), "0.123456789"); + + let s = Scalar::Decimal(123, 9, -3); + assert_eq!(s.to_string(), "123000"); + } +} diff --git a/crates/deltalake-core/src/kernel/mod.rs b/crates/deltalake-core/src/kernel/mod.rs index 54f742c3fb..c8d01c138d 100644 --- a/crates/deltalake-core/src/kernel/mod.rs +++ b/crates/deltalake-core/src/kernel/mod.rs @@ -3,9 +3,13 @@ pub mod actions; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod arrow; +#[cfg(feature = "arrow")] +pub mod client; pub mod error; +pub mod expressions; pub mod schema; pub use actions::*; pub use error::*; +pub use expressions::*; pub use schema::*; diff --git a/crates/deltalake-sql/src/parser.rs b/crates/deltalake-sql/src/parser.rs index c76cced9bd..3287c87215 100644 --- a/crates/deltalake-sql/src/parser.rs +++ b/crates/deltalake-sql/src/parser.rs @@ -63,10 +63,10 @@ impl fmt::Display for Statement { } } -/// Delta Lake SQL Parser based on [`sqlparser`] +/// Delta Lake SQL Parser based on [`sqlparser`](https://crates.io/crates/sqlparser) /// /// This parser handles Delta Lake specific statements, delegating to -/// [`DFParser`](datafusion_sql::parser::DFParser) for other SQL statements. +/// [`DFParser`]for other SQL statements. pub struct DeltaParser<'a> { sql: &'a str, parser: Parser<'a>,