diff --git a/Cargo.toml b/Cargo.toml index 952cc0833..025249708 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ bytemuck = {version = "1.14.2" } byte-slice-cast = { version = "1.2.1" } clap = { version = "4.5.4" } criterion = { version = "0.5.1" } +chrono-tz = {version = "0.9.0", features = ["serde"]} curve25519-dalek = { version = "4", features = ["rand_core"] } derive_more = { version = "0.99" } dyn_partial_eq = { version = "0.1.2" } diff --git a/crates/proof-of-sql/Cargo.toml b/crates/proof-of-sql/Cargo.toml index 6535d7f2d..d6a0736e9 100644 --- a/crates/proof-of-sql/Cargo.toml +++ b/crates/proof-of-sql/Cargo.toml @@ -30,6 +30,7 @@ bumpalo = { workspace = true, features = ["collections"] } bytemuck = { workspace = true } byte-slice-cast = { workspace = true } curve25519-dalek = { workspace = true, features = ["serde"] } +chrono-tz = {workspace = true, features = ["serde"]} derive_more = { workspace = true } dyn_partial_eq = { workspace = true } hashbrown = { workspace = true } diff --git a/crates/proof-of-sql/src/base/commitment/column_bounds.rs b/crates/proof-of-sql/src/base/commitment/column_bounds.rs index 055878cce..22106c930 100644 --- a/crates/proof-of-sql/src/base/commitment/column_bounds.rs +++ b/crates/proof-of-sql/src/base/commitment/column_bounds.rs @@ -207,6 +207,8 @@ pub enum ColumnBounds { BigInt(Bounds), /// The bounds of an Int128 column. Int128(Bounds), + /// The bounds of a Timestamp column. + TimestampTZ(Bounds), } impl ColumnBounds { @@ -219,6 +221,9 @@ impl ColumnBounds { CommittableColumn::Int(ints) => ColumnBounds::Int(Bounds::from_iter(*ints)), CommittableColumn::BigInt(ints) => ColumnBounds::BigInt(Bounds::from_iter(*ints)), CommittableColumn::Int128(ints) => ColumnBounds::Int128(Bounds::from_iter(*ints)), + CommittableColumn::TimestampTZ(_, _, times) => { + ColumnBounds::TimestampTZ(Bounds::from_iter(*times)) + } CommittableColumn::Boolean(_) | CommittableColumn::Decimal75(_, _, _) | CommittableColumn::Scalar(_) @@ -241,6 +246,9 @@ impl ColumnBounds { (ColumnBounds::BigInt(bounds_a), ColumnBounds::BigInt(bounds_b)) => { Ok(ColumnBounds::BigInt(bounds_a.union(bounds_b))) } + (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => { + Ok(ColumnBounds::TimestampTZ(bounds_a.union(bounds_b))) + } (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => { Ok(ColumnBounds::Int128(bounds_a.union(bounds_b))) } @@ -269,7 +277,9 @@ impl ColumnBounds { (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => { Ok(ColumnBounds::Int128(bounds_a.difference(bounds_b))) } - + (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => { + Ok(ColumnBounds::TimestampTZ(bounds_a.difference(bounds_b))) + } (_, _) => Err(ColumnBoundsMismatch(Box::new(self), Box::new(other))), } } @@ -278,7 +288,12 @@ impl ColumnBounds { #[cfg(test)] mod tests { use super::*; - use crate::base::{database::OwnedColumn, math::decimal::Precision, scalar::Curve25519Scalar}; + use crate::base::{ + database::OwnedColumn, + math::decimal::Precision, + scalar::Curve25519Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, + }; use itertools::Itertools; #[test] @@ -518,8 +533,19 @@ mod tests { ); let committable_decimal75_column = CommittableColumn::from(&decimal75_column); let decimal75_column_bounds = ColumnBounds::from_column(&committable_decimal75_column); - assert_eq!(decimal75_column_bounds, ColumnBounds::NoOrder); + + let timestamp_column = OwnedColumn::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + vec![1_i64, 2, 3, 4], + ); + let committable_timestamp_column = CommittableColumn::from(×tamp_column); + let timestamp_column_bounds = ColumnBounds::from_column(&committable_timestamp_column); + assert_eq!( + timestamp_column_bounds, + ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 })) + ); } #[test] @@ -561,6 +587,14 @@ mod tests { int128_a.try_union(int128_b).unwrap(), ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 6 })) ); + + let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 3 })); + let timestamp_b = + ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 4, max: 6 })); + assert_eq!( + timestamp_a.try_union(timestamp_b).unwrap(), + ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 6 })) + ); } #[test] @@ -570,6 +604,7 @@ mod tests { let int = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: -10, max: 10 })); let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 })); let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 })); + let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 })); let bounds = [ (no_order, "NoOrder"), @@ -577,6 +612,7 @@ mod tests { (int, "Int"), (bigint, "BigInt"), (int128, "Int128"), + (timestamp, "Timestamp"), ]; for ((bound_a, name_a), (bound_b, name_b)) in bounds.iter().tuple_combinations() { @@ -618,6 +654,13 @@ mod tests { int128_a.try_difference(int128_b).unwrap(), ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 4 })) ); + + let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 })); + let timestamp_b = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 3, max: 6 })); + assert_eq!( + timestamp_a.try_difference(timestamp_b).unwrap(), + ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 4 })) + ); } #[test] @@ -625,6 +668,8 @@ mod tests { let no_order = ColumnBounds::NoOrder; let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 })); let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 })); + let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 })); + let smallint = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 })); assert!(no_order.try_difference(bigint).is_err()); assert!(bigint.try_difference(no_order).is_err()); @@ -634,5 +679,8 @@ mod tests { assert!(bigint.try_difference(int128).is_err()); assert!(int128.try_difference(bigint).is_err()); + + assert!(smallint.try_difference(timestamp).is_err()); + assert!(timestamp.try_difference(smallint).is_err()); } } diff --git a/crates/proof-of-sql/src/base/commitment/column_commitment_metadata.rs b/crates/proof-of-sql/src/base/commitment/column_commitment_metadata.rs index 6210c96d3..9d3fce8cd 100644 --- a/crates/proof-of-sql/src/base/commitment/column_commitment_metadata.rs +++ b/crates/proof-of-sql/src/base/commitment/column_commitment_metadata.rs @@ -40,6 +40,7 @@ impl ColumnCommitmentMetadata { | (ColumnType::Int, ColumnBounds::Int(_)) | (ColumnType::BigInt, ColumnBounds::BigInt(_)) | (ColumnType::Int128, ColumnBounds::Int128(_)) + | (ColumnType::TimestampTZ(_, _), ColumnBounds::TimestampTZ(_)) | ( ColumnType::Boolean | ColumnType::VarChar @@ -72,6 +73,10 @@ impl ColumnCommitmentMetadata { BoundsInner::try_new(i64::MIN, i64::MAX) .expect("i64::MIN and i64::MAX are valid bounds for BigInt"), )), + ColumnType::TimestampTZ(_, _) => ColumnBounds::TimestampTZ(super::Bounds::Bounded( + BoundsInner::try_new(i64::MIN, i64::MAX) + .expect("i64::MIN and i64::MAX are valid bounds for TimeStamp"), + )), ColumnType::Int128 => ColumnBounds::Int128(super::Bounds::Bounded( BoundsInner::try_new(i128::MIN, i128::MAX) .expect("i128::MIN and i128::MAX are valid bounds for Int128"), @@ -160,8 +165,11 @@ impl ColumnCommitmentMetadata { mod tests { use super::*; use crate::base::{ - commitment::column_bounds::Bounds, database::OwnedColumn, math::decimal::Precision, + commitment::column_bounds::Bounds, + database::OwnedColumn, + math::decimal::Precision, scalar::Curve25519Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; #[test] @@ -219,6 +227,18 @@ mod tests { } ); + assert_eq!( + ColumnCommitmentMetadata::try_new( + ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC), + ColumnBounds::TimestampTZ(Bounds::Empty), + ) + .unwrap(), + ColumnCommitmentMetadata { + column_type: ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC), + bounds: ColumnBounds::TimestampTZ(Bounds::Empty), + } + ); + assert_eq!( ColumnCommitmentMetadata::try_new( ColumnType::Int128, @@ -349,6 +369,26 @@ mod tests { ); assert_eq!(decimal_metadata.bounds(), &ColumnBounds::NoOrder); + let timestamp_column: OwnedColumn = + OwnedColumn::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [1i64, 2, 3, 4, 5].to_vec(), + ); + let committable_timestamp_column = CommittableColumn::from(×tamp_column); + let timestamp_metadata = + ColumnCommitmentMetadata::from_column(&committable_timestamp_column); + assert_eq!( + timestamp_metadata.column_type(), + &ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC) + ); + if let ColumnBounds::TimestampTZ(Bounds::Sharp(bounds)) = timestamp_metadata.bounds() { + assert_eq!(bounds.min(), &1); + assert_eq!(bounds.max(), &5); + } else { + panic!("Bounds constructed from nonempty TimestampTZ column should be ColumnBounds::BigInt(Bounds::Sharp(_))"); + } + let varchar_column = OwnedColumn::::VarChar( ["Lorem", "ipsum", "dolor", "sit", "amet"] .map(String::from) @@ -484,6 +524,80 @@ mod tests { bigint_metadata_a.try_union(bigint_metadata_b).unwrap(), bigint_metadata_c ); + + // Ordered case for TimestampTZ + // Example Unix epoch times + let times = [ + 1_625_072_400, + 1_625_076_000, + 1_625_079_600, + 1_625_072_400, + 1_625_065_000, + ]; + let timezone = PoSQLTimeZone::UTC; + let timeunit = PoSQLTimeUnit::Second; + let timestamp_column_a = CommittableColumn::TimestampTZ(timeunit, timezone, ×[..2]); + let timestamp_metadata_a = ColumnCommitmentMetadata::from_column(×tamp_column_a); + let timestamp_column_b = CommittableColumn::TimestampTZ(timeunit, timezone, ×[2..]); + let timestamp_metadata_b = ColumnCommitmentMetadata::from_column(×tamp_column_b); + let timestamp_column_c = CommittableColumn::TimestampTZ(timeunit, timezone, ×); + let timestamp_metadata_c = ColumnCommitmentMetadata::from_column(×tamp_column_c); + assert_eq!( + timestamp_metadata_a + .try_union(timestamp_metadata_b) + .unwrap(), + timestamp_metadata_c + ); + } + + #[test] + fn we_can_difference_timestamp_tz_matching_metadata() { + // Ordered case + let times = [ + 1_625_072_400, + 1_625_076_000, + 1_625_079_600, + 1_625_072_400, + 1_625_065_000, + ]; + let timezone = PoSQLTimeZone::UTC; + let timeunit = PoSQLTimeUnit::Second; + + let timestamp_column_a = CommittableColumn::TimestampTZ(timeunit, timezone, ×[..2]); + let timestamp_metadata_a = ColumnCommitmentMetadata::from_column(×tamp_column_a); + let timestamp_column_b = CommittableColumn::TimestampTZ(timeunit, timezone, ×); + let timestamp_metadata_b = ColumnCommitmentMetadata::from_column(×tamp_column_b); + + let b_difference_a = timestamp_metadata_b + .try_difference(timestamp_metadata_a) + .unwrap(); + assert_eq!( + b_difference_a.column_type, + ColumnType::TimestampTZ(timeunit, timezone) + ); + if let ColumnBounds::TimestampTZ(Bounds::Bounded(bounds)) = b_difference_a.bounds { + assert_eq!(bounds.min(), &1_625_065_000); + assert_eq!(bounds.max(), &1_625_079_600); + } else { + panic!("difference of overlapping bounds should be Bounded"); + } + + let timestamp_column_empty = CommittableColumn::TimestampTZ(timeunit, timezone, &[]); + let timestamp_metadata_empty = + ColumnCommitmentMetadata::from_column(×tamp_column_empty); + + assert_eq!( + timestamp_metadata_b + .try_difference(timestamp_metadata_empty) + .unwrap(), + timestamp_metadata_b + ); + assert_eq!( + timestamp_metadata_empty + .try_difference(timestamp_metadata_b) + .unwrap(), + timestamp_metadata_empty + ); } #[test] @@ -741,5 +855,43 @@ mod tests { assert!(different_decimal75_metadata .try_union(decimal75_metadata) .is_err()); + + let timestamp_tz_metadata_a = ColumnCommitmentMetadata { + column_type: ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC), + bounds: ColumnBounds::TimestampTZ(Bounds::Empty), + }; + + let timestamp_tz_metadata_b = ColumnCommitmentMetadata { + column_type: ColumnType::TimestampTZ(PoSQLTimeUnit::Millisecond, PoSQLTimeZone::UTC), + bounds: ColumnBounds::TimestampTZ(Bounds::Empty), + }; + + // Tests for union operations + assert!(timestamp_tz_metadata_a.try_union(varchar_metadata).is_err()); + assert!(varchar_metadata.try_union(timestamp_tz_metadata_a).is_err()); + + // Tests for difference operations + assert!(timestamp_tz_metadata_a + .try_difference(scalar_metadata) + .is_err()); + assert!(scalar_metadata + .try_difference(timestamp_tz_metadata_a) + .is_err()); + + // Tests for different time units within the same type + assert!(timestamp_tz_metadata_a + .try_union(timestamp_tz_metadata_b) + .is_err()); + assert!(timestamp_tz_metadata_b + .try_union(timestamp_tz_metadata_a) + .is_err()); + + // Difference with different time units + assert!(timestamp_tz_metadata_a + .try_difference(timestamp_tz_metadata_b) + .is_err()); + assert!(timestamp_tz_metadata_b + .try_difference(timestamp_tz_metadata_a) + .is_err()); } } diff --git a/crates/proof-of-sql/src/base/commitment/committable_column.rs b/crates/proof-of-sql/src/base/commitment/committable_column.rs index bf83bcfd7..546f303f5 100644 --- a/crates/proof-of-sql/src/base/commitment/committable_column.rs +++ b/crates/proof-of-sql/src/base/commitment/committable_column.rs @@ -3,6 +3,7 @@ use crate::base::{ math::decimal::Precision, ref_into::RefInto, scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; #[cfg(feature = "blitzar")] use blitzar::sequence::Sequence; @@ -37,6 +38,8 @@ pub enum CommittableColumn<'a> { Scalar(Vec<[u64; 4]>), /// Column of limbs for committing to scalars, hashed from a VarChar column. VarChar(Vec<[u64; 4]>), + /// Borrowed Timestamp column with Timezone, mapped to `i64`. + TimestampTZ(PoSQLTimeUnit, PoSQLTimeZone, &'a [i64]), } impl<'a> CommittableColumn<'a> { @@ -51,6 +54,7 @@ impl<'a> CommittableColumn<'a> { CommittableColumn::Scalar(col) => col.len(), CommittableColumn::VarChar(col) => col.len(), CommittableColumn::Boolean(col) => col.len(), + CommittableColumn::TimestampTZ(_, _, col) => col.len(), } } @@ -78,6 +82,7 @@ impl<'a> From<&CommittableColumn<'a>> for ColumnType { CommittableColumn::Scalar(_) => ColumnType::Scalar, CommittableColumn::VarChar(_) => ColumnType::VarChar, CommittableColumn::Boolean(_) => ColumnType::Boolean, + CommittableColumn::TimestampTZ(tu, tz, _) => ColumnType::TimestampTZ(*tu, *tz), } } } @@ -99,6 +104,7 @@ impl<'a, S: Scalar> From<&Column<'a, S>> for CommittableColumn<'a> { let as_limbs: Vec<_> = scalars.iter().map(RefInto::<[u64; 4]>::ref_into).collect(); CommittableColumn::VarChar(as_limbs) } + Column::TimestampTZ(tu, tz, times) => CommittableColumn::TimestampTZ(*tu, *tz, times), } } } @@ -128,6 +134,9 @@ impl<'a, S: Scalar> From<&'a OwnedColumn> for CommittableColumn<'a> { .map(Into::<[u64; 4]>::into) .collect(), ), + OwnedColumn::TimestampTZ(tu, tz, times) => { + CommittableColumn::TimestampTZ(*tu, *tz, times as &[_]) + } } } } @@ -142,11 +151,13 @@ impl<'a> From<&'a [i32]> for CommittableColumn<'a> { CommittableColumn::Int(value) } } + impl<'a> From<&'a [i64]> for CommittableColumn<'a> { fn from(value: &'a [i64]) -> Self { CommittableColumn::BigInt(value) } } + impl<'a> From<&'a [i128]> for CommittableColumn<'a> { fn from(value: &'a [i128]) -> Self { CommittableColumn::Int128(value) @@ -175,6 +186,7 @@ impl<'a, 'b> From<&'a CommittableColumn<'b>> for Sequence<'a> { CommittableColumn::Scalar(limbs) => Sequence::from(limbs), CommittableColumn::VarChar(limbs) => Sequence::from(limbs), CommittableColumn::Boolean(bools) => Sequence::from(*bools), + CommittableColumn::TimestampTZ(_, _, times) => Sequence::from(*times), } } } @@ -208,6 +220,31 @@ mod tests { assert_eq!(res_committable_column, test_committable_column) } + #[test] + fn we_can_get_type_and_length_of_timestamp_column() { + // empty case + let smallint_committable_column = + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]); + assert_eq!(smallint_committable_column.len(), 0); + assert!(smallint_committable_column.is_empty()); + assert_eq!( + smallint_committable_column.column_type(), + ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC) + ); + + let smallint_committable_column = CommittableColumn::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + &[12, 34, 56], + ); + assert_eq!(smallint_committable_column.len(), 3); + assert!(!smallint_committable_column.is_empty()); + assert_eq!( + smallint_committable_column.column_type(), + ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC) + ); + } + #[test] fn we_can_get_type_and_length_of_smallint_column() { // empty case @@ -347,6 +384,34 @@ mod tests { assert_eq!(bool_committable_column.column_type(), ColumnType::Boolean); } + #[test] + fn we_can_convert_from_borrowing_timestamp_column() { + // empty case + let from_borrowed_column = + CommittableColumn::from(&Column::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + &[], + )); + assert_eq!( + from_borrowed_column, + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + + // non-empty case + let timestamps = [1625072400, 1625076000, 1625083200]; + let from_borrowed_column = + CommittableColumn::from(&Column::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + ×tamps, + )); + assert_eq!( + from_borrowed_column, + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, ×tamps) + ); + } + #[test] fn we_can_convert_from_borrowing_bigint_column() { // empty case @@ -501,6 +566,34 @@ mod tests { ); } + #[test] + fn we_can_convert_from_owned_timestamp_column() { + // empty case + let owned_column = OwnedColumn::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + Vec::new(), + ); + let from_owned_column = CommittableColumn::from(&owned_column); + assert_eq!( + from_owned_column, + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + + // non-empty case + let timestamps = vec![1625072400, 1625076000, 1625083200]; + let owned_column = OwnedColumn::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + timestamps.clone(), + ); + let from_owned_column = CommittableColumn::from(&owned_column); + assert_eq!( + from_owned_column, + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, ×tamps) + ); + } + #[test] fn we_can_convert_from_owned_int_column() { // empty case @@ -779,4 +872,30 @@ mod tests { ); assert_eq!(commitment_buffer[0], commitment_buffer[1]); } + + #[test] + fn we_can_commit_to_timestamp_column_through_committable_column() { + // Empty case + let committable_column = + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]); + let sequence = Sequence::from(&committable_column); + let mut commitment_buffer = [CompressedRistretto::default()]; + compute_curve25519_commitments(&mut commitment_buffer, &[sequence], 0); + assert_eq!(commitment_buffer[0], CompressedRistretto::default()); + + // Non-empty case + let timestamps = [1625072400, 1625076000, 1625083200]; + let committable_column = + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, ×tamps); + + let sequence_actual = Sequence::from(&committable_column); + let sequence_expected = Sequence::from(timestamps.as_slice()); + let mut commitment_buffer = [CompressedRistretto::default(); 2]; + compute_curve25519_commitments( + &mut commitment_buffer, + &[sequence_actual, sequence_expected], + 0, + ); + assert_eq!(commitment_buffer[0], commitment_buffer[1]); + } } diff --git a/crates/proof-of-sql/src/base/database/arrow_array_to_column_conversion.rs b/crates/proof-of-sql/src/base/database/arrow_array_to_column_conversion.rs index 2c339d8fe..91bb4b6c1 100644 --- a/crates/proof-of-sql/src/base/database/arrow_array_to_column_conversion.rs +++ b/crates/proof-of-sql/src/base/database/arrow_array_to_column_conversion.rs @@ -3,16 +3,18 @@ use crate::{ base::{ database::Column, math::decimal::Precision, - scalar::{Curve25519Scalar, Scalar}, + scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }, sql::parse::ConversionError, }; use arrow::{ array::{ Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Int16Array, Int32Array, - Int64Array, StringArray, + Int64Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, }, - datatypes::{i256, DataType}, + datatypes::{i256, DataType, TimeUnit as ArrowTimeUnit}, }; use bumpalo::Bump; use std::ops::Range; @@ -36,6 +38,9 @@ pub enum ArrowArrayToColumnConversionError { /// Variant for conversion errors #[error("conversion error: {0}")] ConversionError(#[from] ConversionError), + /// Variant for timezone conversion errors, i.e. invalid timezone + #[error("Timezone conversion failed: {0}")] + TimezoneConversionError(String), } /// This trait is used to provide utility functions to convert ArrayRefs into proof types (Column, Scalars, etc.) @@ -48,7 +53,7 @@ pub trait ArrayRefExt { #[cfg(feature = "blitzar")] fn to_curve25519_scalars( &self, - ) -> Result, ArrowArrayToColumnConversionError>; + ) -> Result, ArrowArrayToColumnConversionError>; /// Convert an ArrayRef into a Proof of SQL Column type /// @@ -76,7 +81,7 @@ impl ArrayRefExt for ArrayRef { #[cfg(feature = "blitzar")] fn to_curve25519_scalars( &self, - ) -> Result, ArrowArrayToColumnConversionError> { + ) -> Result, ArrowArrayToColumnConversionError> { if self.null_count() != 0 { return Err(ArrowArrayToColumnConversionError::ArrayContainsNulls); } @@ -131,6 +136,24 @@ impl ArrayRefExt for ArrayRef { }) .collect() }), + DataType::Timestamp(time_unit, _) => match time_unit { + ArrowTimeUnit::Second => self + .as_any() + .downcast_ref::() + .map(|array| array.values().iter().map(|v| Ok((*v).into())).collect()), + ArrowTimeUnit::Millisecond => self + .as_any() + .downcast_ref::() + .map(|array| array.values().iter().map(|v| Ok((*v).into())).collect()), + ArrowTimeUnit::Microsecond => self + .as_any() + .downcast_ref::() + .map(|array| array.values().iter().map(|v| Ok((*v).into())).collect()), + ArrowTimeUnit::Nanosecond => self + .as_any() + .downcast_ref::() + .map(|array| array.values().iter().map(|v| Ok((*v).into())).collect()), + }, _ => None, }; @@ -251,6 +274,61 @@ impl ArrayRefExt for ArrayRef { )) } } + // Handle all possible TimeStamp TimeUnit instances + DataType::Timestamp(time_unit, tz) => match time_unit { + ArrowTimeUnit::Second => { + if let Some(array) = self.as_any().downcast_ref::() { + Ok(Column::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::try_from(tz.clone())?, + &array.values()[range.start..range.end], + )) + } else { + Err(ArrowArrayToColumnConversionError::UnsupportedType( + self.data_type().clone(), + )) + } + } + ArrowTimeUnit::Millisecond => { + if let Some(array) = self.as_any().downcast_ref::() { + Ok(Column::TimestampTZ( + PoSQLTimeUnit::Millisecond, + PoSQLTimeZone::try_from(tz.clone())?, + &array.values()[range.start..range.end], + )) + } else { + Err(ArrowArrayToColumnConversionError::UnsupportedType( + self.data_type().clone(), + )) + } + } + ArrowTimeUnit::Microsecond => { + if let Some(array) = self.as_any().downcast_ref::() { + Ok(Column::TimestampTZ( + PoSQLTimeUnit::Microsecond, + PoSQLTimeZone::try_from(tz.clone())?, + &array.values()[range.start..range.end], + )) + } else { + Err(ArrowArrayToColumnConversionError::UnsupportedType( + self.data_type().clone(), + )) + } + } + ArrowTimeUnit::Nanosecond => { + if let Some(array) = self.as_any().downcast_ref::() { + Ok(Column::TimestampTZ( + PoSQLTimeUnit::Nanosecond, + PoSQLTimeZone::try_from(tz.clone())?, + &array.values()[range.start..range.end], + )) + } else { + Err(ArrowArrayToColumnConversionError::UnsupportedType( + self.data_type().clone(), + )) + } + } + }, DataType::Utf8 => { if let Some(array) = self.as_any().downcast_ref::() { let vals = alloc @@ -283,10 +361,92 @@ impl ArrayRefExt for ArrayRef { mod tests { use super::*; - use crate::proof_primitive::dory::DoryScalar; + use crate::{base::scalar::Curve25519Scalar, proof_primitive::dory::DoryScalar}; use arrow::array::Decimal256Builder; use std::{str::FromStr, sync::Arc}; + #[test] + fn we_can_convert_timestamp_array_normal_range() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000, 1625083200]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + + let result = array.to_column::(&alloc, &(1..3), None); + assert_eq!( + result.unwrap(), + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &data[1..3]) + ); + } + + #[test] + fn we_can_build_an_empty_column_from_an_empty_range_timestamp() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.into(), + Some("UTC"), + )); + + let result = array + .to_column::(&alloc, &(2..2), None) + .unwrap(); + assert_eq!( + result, + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + } + + #[test] + fn we_can_convert_timestamp_array_empty_range() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000, 1625083200]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.into(), + Some("UTC"), + )); + + let result = array.to_column::(&alloc, &(1..1), None); + assert_eq!( + result.unwrap(), + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + } + + #[test] + fn we_cannot_convert_timestamp_array_oob_range() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000, 1625083200]; + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.into(), + Some("UTC"), + )); + + let result = array.to_column::(&alloc, &(3..5), None); + assert_eq!( + result, + Err(ArrowArrayToColumnConversionError::IndexOutOfBounds(3, 5)) + ); + } + + #[test] + fn we_can_convert_timestamp_array_with_nulls() { + let alloc = Bump::new(); + let data = vec![Some(1625072400), None, Some(1625083200)]; + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.into(), + Some("UTC"), + )); + + let result = array.to_column::(&alloc, &(0..3), None); + assert!(matches!( + result, + Err(ArrowArrayToColumnConversionError::ArrayContainsNulls) + )); + } + #[test] fn we_cannot_convert_utf8_array_oob_range() { let alloc = Bump::new(); @@ -830,6 +990,24 @@ mod tests { ); } + #[test] + fn we_can_convert_valid_timestamp_array_refs_into_valid_columns() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + + let result = array + .to_column::(&alloc, &(0..2), None) + .unwrap(); + assert_eq!( + result, + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &data[..]) + ); + } + #[test] fn we_can_convert_valid_boolean_array_refs_into_valid_columns_using_ranges_smaller_than_arrays() { @@ -873,6 +1051,25 @@ mod tests { ); } + #[test] + fn we_can_convert_valid_timestamp_array_refs_into_valid_columns_using_ranges_smaller_than_arrays( + ) { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000, 1625083200]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + + // Test using a range smaller than the array size + assert_eq!( + array + .to_column::(&alloc, &(1..3), None) + .unwrap(), + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &data[1..3]) + ); + } + #[test] fn we_can_convert_valid_string_array_refs_into_valid_columns_using_ranges_smaller_than_arrays() { @@ -914,6 +1111,23 @@ mod tests { assert_eq!(result, Column::VarChar((&[], &[]))); } + #[test] + fn we_can_convert_valid_timestamp_array_refs_into_valid_columns_using_ranges_with_zero_size() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + let result = array + .to_column::(&alloc, &(0..0), None) + .unwrap(); + assert_eq!( + result, + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + } + #[test] fn we_can_convert_valid_boolean_array_refs_into_valid_vec_scalars() { let data = vec![false, true]; @@ -927,6 +1141,23 @@ mod tests { ); } + #[test] + fn we_can_convert_valid_timestamp_array_refs_into_valid_vec_scalars() { + let data = vec![1625072400, 1625076000]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + + assert_eq!( + array.to_curve25519_scalars(), + Ok(data + .iter() + .map(|&v| Curve25519Scalar::from(v)) + .collect::>()) + ); + } + #[test] fn we_can_convert_valid_integer_array_refs_into_valid_vec_scalars() { let data = vec![1, -3]; diff --git a/crates/proof-of-sql/src/base/database/column.rs b/crates/proof-of-sql/src/base/database/column.rs index b78ec8873..5bb0fa19d 100644 --- a/crates/proof-of-sql/src/base/database/column.rs +++ b/crates/proof-of-sql/src/base/database/column.rs @@ -2,12 +2,14 @@ use super::{LiteralValue, TableRef}; use crate::base::{ math::decimal::{scale_scalar, Precision}, scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, TimeUnit as ArrowTimeUnit}; use bumpalo::Bump; use proof_of_sql_parser::Identifier; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde::{Deserialize, Serialize}; +use std::sync::Arc; /// Represents a read-only view of a column in an in-memory, /// column-oriented database. @@ -37,7 +39,13 @@ pub enum Column<'a, S: Scalar> { /// - the first element maps to the str values. /// - the second element maps to the str hashes (see [crate::base::scalar::Scalar]). VarChar((&'a [&'a str], &'a [S])), + /// Timestamp columns with timezone + /// - the first element maps to the stored [`TimeUnit`] + /// - the second element maps to a timezone + /// - the third element maps to columns of timeunits since unix epoch + TimestampTZ(PoSQLTimeUnit, PoSQLTimeZone, &'a [i64]), } + impl<'a, S: Scalar> Column<'a, S> { /// Provides the column type associated with the column pub fn column_type(&self) -> ColumnType { @@ -50,6 +58,9 @@ impl<'a, S: Scalar> Column<'a, S> { Self::Int128(_) => ColumnType::Int128, Self::Scalar(_) => ColumnType::Scalar, Self::Decimal75(precision, scale, _) => ColumnType::Decimal75(*precision, *scale), + Self::TimestampTZ(time_unit, timezone, _) => { + ColumnType::TimestampTZ(*time_unit, *timezone) + } } } /// Returns the length of the column. @@ -66,6 +77,7 @@ impl<'a, S: Scalar> Column<'a, S> { Self::Int128(col) => col.len(), Self::Scalar(col) => col.len(), Self::Decimal75(_, _, col) => col.len(), + Self::TimestampTZ(_, _, col) => col.len(), } } /// Returns `true` if the column has no elements. @@ -101,6 +113,9 @@ impl<'a, S: Scalar> Column<'a, S> { *scale, alloc.alloc_slice_fill_copy(length, *value), ), + LiteralValue::TimeStampTZ(tu, tz, value) => { + Column::TimestampTZ(*tu, *tz, alloc.alloc_slice_fill_copy(length, *value)) + } LiteralValue::VarChar((string, scalar)) => Column::VarChar(( alloc.alloc_slice_fill_with(length, |_| alloc.alloc_str(string) as &str), alloc.alloc_slice_fill_copy(length, *scalar), @@ -153,6 +168,10 @@ impl<'a, S: Scalar> Column<'a, S> { .par_iter() .map(|s| *s * scale_factor) .collect::>(), + Self::TimestampTZ(_, _, col) => col + .par_iter() + .map(|i| S::from(i) * scale_factor) + .collect::>(), } } } @@ -194,6 +213,9 @@ pub enum ColumnType { /// Mapped to i256 #[serde(rename = "Decimal75", alias = "DECIMAL75", alias = "decimal75")] Decimal75(Precision, i8), + /// Mapped to i64 + #[serde(alias = "TIMESTAMP", alias = "timestamp")] + TimestampTZ(PoSQLTimeUnit, PoSQLTimeZone), } impl ColumnType { @@ -224,6 +246,7 @@ impl ColumnType { Self::SmallInt => Some(5_u8), Self::Int => Some(10_u8), Self::BigInt => Some(19_u8), + Self::TimestampTZ(_, _) => Some(19_u8), Self::Int128 => Some(39_u8), Self::Decimal75(precision, _) => Some(precision.value()), // Scalars are not in database & are only used for typeless comparisons for testing so we return 0 @@ -256,6 +279,9 @@ impl From<&ColumnType> for DataType { } ColumnType::VarChar => DataType::Utf8, ColumnType::Scalar => unimplemented!("Cannot convert Scalar type to arrow type"), + ColumnType::TimestampTZ(timeunit, timezone) => { + DataType::Timestamp(ArrowTimeUnit::from(*timeunit), Some(Arc::from(timezone))) + } } } } @@ -274,6 +300,10 @@ impl TryFrom for ColumnType { DataType::Decimal256(precision, scale) if precision <= 75 => { Ok(ColumnType::Decimal75(Precision::new(precision)?, scale)) } + DataType::Timestamp(time_unit, timezone_option) => Ok(ColumnType::TimestampTZ( + PoSQLTimeUnit::from(time_unit), + PoSQLTimeZone::try_from(timezone_option)?, + )), DataType::Utf8 => Ok(ColumnType::VarChar), _ => Err(format!("Unsupported arrow data type {:?}", data_type)), } @@ -298,6 +328,11 @@ impl std::fmt::Display for ColumnType { } ColumnType::VarChar => write!(f, "VARCHAR"), ColumnType::Scalar => write!(f, "SCALAR"), + ColumnType::TimestampTZ(timeunit, timezone) => write!( + f, + "TIMESTAMP(TIMEUNIT: {:?}, TIMEZONE: {timeunit})", + timezone + ), } } } @@ -381,6 +416,10 @@ mod tests { #[test] fn column_type_serializes_to_string() { + let column_type = ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC); + let serialized = serde_json::to_string(&column_type).unwrap(); + assert_eq!(serialized, r#"{"TimestampTZ":["Second","UTC"]}"#); + let column_type = ColumnType::Boolean; let serialized = serde_json::to_string(&column_type).unwrap(); assert_eq!(serialized, r#""Boolean""#); diff --git a/crates/proof-of-sql/src/base/database/literal_value.rs b/crates/proof-of-sql/src/base/database/literal_value.rs index 3d9345685..76bc41865 100644 --- a/crates/proof-of-sql/src/base/database/literal_value.rs +++ b/crates/proof-of-sql/src/base/database/literal_value.rs @@ -1,4 +1,9 @@ -use crate::base::{database::ColumnType, math::decimal::Precision, scalar::Scalar}; +use crate::base::{ + database::ColumnType, + math::decimal::Precision, + scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, +}; use serde::{Deserialize, Serialize}; /// Represents a literal value. @@ -29,6 +34,9 @@ pub enum LiteralValue { Decimal75(Precision, i8, S), /// Scalar literals Scalar(S), + /// TimeStamp defined over a unit (s, ms, ns, etc) and timezone with backing store + /// mapped to i64, which is time units since unix epoch + TimeStampTZ(PoSQLTimeUnit, PoSQLTimeZone, i64), } impl LiteralValue { @@ -43,6 +51,7 @@ impl LiteralValue { Self::Int128(_) => ColumnType::Int128, Self::Scalar(_) => ColumnType::Scalar, Self::Decimal75(precision, scale, _) => ColumnType::Decimal75(*precision, *scale), + Self::TimeStampTZ(tu, tz, _) => ColumnType::TimestampTZ(*tu, *tz), } } @@ -57,6 +66,7 @@ impl LiteralValue { Self::Int128(i) => i.into(), Self::Decimal75(_, _, s) => *s, Self::Scalar(scalar) => *scalar, + Self::TimeStampTZ(_, _, time) => time.into(), } } } diff --git a/crates/proof-of-sql/src/base/database/owned_and_arrow_conversions.rs b/crates/proof-of-sql/src/base/database/owned_and_arrow_conversions.rs index e9f522a2b..3e551b89e 100644 --- a/crates/proof-of-sql/src/base/database/owned_and_arrow_conversions.rs +++ b/crates/proof-of-sql/src/base/database/owned_and_arrow_conversions.rs @@ -20,13 +20,15 @@ use crate::base::{ }, math::decimal::Precision, scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; use arrow::{ array::{ ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Int16Array, Int32Array, - Int64Array, StringArray, + Int64Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, }, - datatypes::{i256, DataType, Schema, SchemaRef}, + datatypes::{i256, DataType, Schema, SchemaRef, TimeUnit as ArrowTimeUnit}, error::ArrowError, record_batch::RecordBatch, }; @@ -54,6 +56,12 @@ pub enum OwnedArrowConversionError { /// This error occurs when trying to convert from an Arrow array with nulls. #[error("null values are not supported in OwnedColumn yet")] NullNotSupportedYet, + /// This error occurs when trying to convert from an unsupported timestamp unit. + #[error("unsupported timestamp unit: {0}")] + UnsupportedTimestampUnit(String), + /// This error occurs when trying to convert from an invalid timezone string. + #[error("invalid timezone string: {0}")] + InvalidTimezone(String), // New error variant for timezone strings } impl From> for ArrayRef { @@ -79,6 +87,12 @@ impl From> for ArrayRef { } OwnedColumn::Scalar(_) => unimplemented!("Cannot convert Scalar type to arrow type"), OwnedColumn::VarChar(col) => Arc::new(StringArray::from(col)), + OwnedColumn::TimestampTZ(time_unit, _, col) => match time_unit { + PoSQLTimeUnit::Second => Arc::new(TimestampSecondArray::from(col)), + PoSQLTimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(col)), + PoSQLTimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(col)), + PoSQLTimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(col)), + }, } } } @@ -174,6 +188,72 @@ impl TryFrom<&ArrayRef> for OwnedColumn { .map(|s| s.unwrap().to_string()) .collect(), )), + DataType::Timestamp(time_unit, timezone) => match time_unit { + ArrowTimeUnit::Second => { + let array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OwnedArrowConversionError::UnsupportedTimestampUnit( + "Second".to_string(), + ) + })?; + let timestamps = array.values().iter().copied().collect::>(); + Ok(OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::try_from(timezone.clone())?, + timestamps, + )) + } + ArrowTimeUnit::Millisecond => { + let array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OwnedArrowConversionError::UnsupportedTimestampUnit( + "Millisecond".to_string(), + ) + })?; + let timestamps = array.values().iter().copied().collect::>(); + Ok(OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Millisecond, + PoSQLTimeZone::try_from(timezone.clone())?, + timestamps, + )) + } + ArrowTimeUnit::Microsecond => { + let array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OwnedArrowConversionError::UnsupportedTimestampUnit( + "Microsecond".to_string(), + ) + })?; + let timestamps = array.values().iter().copied().collect::>(); + Ok(OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Microsecond, + PoSQLTimeZone::try_from(timezone.clone())?, + timestamps, + )) + } + ArrowTimeUnit::Nanosecond => { + let array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OwnedArrowConversionError::UnsupportedTimestampUnit( + "Nanosecond".to_string(), + ) + })?; + let timestamps = array.values().iter().copied().collect::>(); + Ok(OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Nanosecond, + PoSQLTimeZone::try_from(timezone.clone())?, + timestamps, + )) + } + }, &data_type => Err(OwnedArrowConversionError::UnsupportedType( data_type.clone(), )), diff --git a/crates/proof-of-sql/src/base/database/owned_column.rs b/crates/proof-of-sql/src/base/database/owned_column.rs index 03420916e..f6b9fb6a0 100644 --- a/crates/proof-of-sql/src/base/database/owned_column.rs +++ b/crates/proof-of-sql/src/base/database/owned_column.rs @@ -3,7 +3,11 @@ /// converting to the final result in either Arrow format or JSON. /// This is the analog of an arrow Array. use super::ColumnType; -use crate::base::{math::decimal::Precision, scalar::Scalar}; +use crate::base::{ + math::decimal::Precision, + scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, +}; #[derive(Debug, PartialEq, Clone, Eq)] #[non_exhaustive] /// Supported types for OwnedColumn @@ -24,6 +28,8 @@ pub enum OwnedColumn { Decimal75(Precision, i8, Vec), /// Scalar columns Scalar(Vec), + /// Timestamp columns + TimestampTZ(PoSQLTimeUnit, PoSQLTimeZone, Vec), } impl OwnedColumn { @@ -38,6 +44,7 @@ impl OwnedColumn { OwnedColumn::Int128(col) => col.len(), OwnedColumn::Decimal75(_, _, col) => col.len(), OwnedColumn::Scalar(col) => col.len(), + OwnedColumn::TimestampTZ(_, _, col) => col.len(), } } /// Returns true if the column is empty. @@ -51,6 +58,7 @@ impl OwnedColumn { OwnedColumn::Int128(col) => col.is_empty(), OwnedColumn::Scalar(col) => col.is_empty(), OwnedColumn::Decimal75(_, _, col) => col.is_empty(), + OwnedColumn::TimestampTZ(_, _, col) => col.is_empty(), } } /// Returns the type of the column. @@ -66,47 +74,7 @@ impl OwnedColumn { OwnedColumn::Decimal75(precision, scale, _) => { ColumnType::Decimal75(*precision, *scale) } + OwnedColumn::TimestampTZ(tu, tz, _) => ColumnType::TimestampTZ(*tu, *tz), } } } - -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::Boolean(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::SmallInt(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::Int(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::BigInt(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::Int128(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::VarChar(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::Scalar(Vec::from_iter(iter)) - } -} -impl<'a, S: Scalar> FromIterator<&'a str> for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::from_iter(iter.into_iter().map(|s| s.to_string())) - } -} diff --git a/crates/proof-of-sql/src/base/database/owned_table_test.rs b/crates/proof-of-sql/src/base/database/owned_table_test.rs index 3a9e3c297..617fda933 100644 --- a/crates/proof-of-sql/src/base/database/owned_table_test.rs +++ b/crates/proof-of-sql/src/base/database/owned_table_test.rs @@ -2,6 +2,7 @@ use crate::{ base::{ database::{owned_table_utility::*, OwnedColumn, OwnedTable, OwnedTableError}, scalar::Curve25519Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }, proof_primitive::dory::DoryScalar, }; @@ -56,8 +57,22 @@ fn we_can_create_an_owned_table_with_data() { "boolean", [true, false, true, false, true, false, true, false, true], ), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [0, 1, 2, 3, 4, 5, 6, i64::MIN, i64::MAX], + ), ]); let mut table = IndexMap::new(); + table.insert( + Identifier::try_new("timestamp").unwrap(), + OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [0, 1, 2, 3, 4, 5, 6, i64::MIN, i64::MAX].into(), + ), + ); table.insert( Identifier::try_new("bigint").unwrap(), OwnedColumn::BigInt(vec![0_i64, 1, 2, 3, 4, 5, 6, i64::MIN, i64::MAX]), @@ -109,12 +124,24 @@ fn we_get_inequality_between_tables_with_differing_column_order() { int128("b", [0; 0]), varchar("c", ["0"; 0]), boolean("d", [false; 0]), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [0; 0], + ), ]); let owned_table_b: OwnedTable = owned_table([ boolean("d", [false; 0]), int128("b", [0; 0]), bigint("a", [0; 0]), varchar("c", ["0"; 0]), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [0; 0], + ), ]); assert_ne!(owned_table_a, owned_table_b); } @@ -125,12 +152,24 @@ fn we_get_inequality_between_tables_with_differing_data() { int128("b", [0]), varchar("c", ["0"]), boolean("d", [true]), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [1625072400], + ), ]); let owned_table_b: OwnedTable = owned_table([ bigint("a", [1]), int128("b", [0]), varchar("c", ["0"]), boolean("d", [true]), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [1625076000], + ), ]); assert_ne!(owned_table_a, owned_table_b); } diff --git a/crates/proof-of-sql/src/base/database/owned_table_test_accessor.rs b/crates/proof-of-sql/src/base/database/owned_table_test_accessor.rs index dd790aa25..8b88519d2 100644 --- a/crates/proof-of-sql/src/base/database/owned_table_test_accessor.rs +++ b/crates/proof-of-sql/src/base/database/owned_table_test_accessor.rs @@ -95,6 +95,7 @@ impl DataAccessor for OwnedTableTestA .alloc_slice_fill_iter(col.iter().map(|s| (*s).into())); Column::VarChar((col, scals)) } + OwnedColumn::TimestampTZ(tu, tz, col) => Column::TimestampTZ(*tu, *tz, col), } } } diff --git a/crates/proof-of-sql/src/base/database/owned_table_test_accessor_test.rs b/crates/proof-of-sql/src/base/database/owned_table_test_accessor_test.rs index e3c34d906..0c70a1ade 100644 --- a/crates/proof-of-sql/src/base/database/owned_table_test_accessor_test.rs +++ b/crates/proof-of-sql/src/base/database/owned_table_test_accessor_test.rs @@ -5,6 +5,7 @@ use super::{ use crate::base::{ database::owned_table_utility::*, scalar::{compute_commitment_for_testing, Curve25519Scalar}, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; use blitzar::proof::InnerProductProof; @@ -48,6 +49,12 @@ fn we_can_access_the_columns_of_a_table() { varchar("varchar", ["a", "bc", "d", "e"]), scalar("scalar", [1, 2, 3, 4]), boolean("boolean", [true, false, true, false]), + timestamptz( + "time", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [4, 5, 6, 5], + ), ]); accessor.add_table(table_ref_2, data2, 0_usize); @@ -99,6 +106,16 @@ fn we_can_access_the_columns_of_a_table() { Column::Boolean(col) => assert_eq!(col.to_vec(), vec![true, false, true, false]), _ => panic!("Invalid column type"), }; + + let column = ColumnRef::new( + table_ref_2, + "time".parse().unwrap(), + ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC), + ); + match accessor.get_column(column) { + Column::TimestampTZ(_, _, col) => assert_eq!(col.to_vec(), vec![4, 5, 6, 5]), + _ => panic!("Invalid column type"), + }; } #[test] diff --git a/crates/proof-of-sql/src/base/database/owned_table_utility.rs b/crates/proof-of-sql/src/base/database/owned_table_utility.rs index bf36acbbf..0b2131b30 100644 --- a/crates/proof-of-sql/src/base/database/owned_table_utility.rs +++ b/crates/proof-of-sql/src/base/database/owned_table_utility.rs @@ -14,7 +14,10 @@ //! ]); //! ``` use super::{OwnedColumn, OwnedTable}; -use crate::base::scalar::Scalar; +use crate::base::{ + scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, +}; use core::ops::Deref; use proof_of_sql_parser::Identifier; @@ -195,3 +198,35 @@ pub fn decimal75( ), ) } + +/// Creates a (Identifier, OwnedColumn) pair for a timestamp column. +/// This is primarily intended for use in conjunction with [owned_table]. +/// +/// # Parameters +/// - `name`: The name of the column. +/// - `time_unit`: The time unit of the timestamps. +/// - `timezone`: The timezone for the timestamps. +/// - `data`: The data for the column, provided as an iterator over `i64` values representing time since the unix epoch. +/// +/// # Example +/// ``` +/// use proof_of_sql::base::{database::owned_table_utility::*, +/// scalar::Curve25519Scalar, +/// time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}}; +/// use chrono_tz::Europe::London; +/// +/// let result = owned_table::([ +/// timestamptz("event_time", PoSQLTimeUnit::Second, PoSQLTimeZone::new(London), vec![1625072400, 1625076000, 1625079600]), +/// ]); +/// ``` +pub fn timestamptz( + name: impl Deref, + time_unit: PoSQLTimeUnit, + timezone: PoSQLTimeZone, + data: impl IntoIterator, +) -> (Identifier, OwnedColumn) { + ( + name.parse().unwrap(), + OwnedColumn::TimestampTZ(time_unit, timezone, data.into_iter().collect()), + ) +} diff --git a/crates/proof-of-sql/src/base/database/record_batch_dataframe_conversion.rs b/crates/proof-of-sql/src/base/database/record_batch_dataframe_conversion.rs index 88ba6d76b..4c49a148d 100644 --- a/crates/proof-of-sql/src/base/database/record_batch_dataframe_conversion.rs +++ b/crates/proof-of-sql/src/base/database/record_batch_dataframe_conversion.rs @@ -1,8 +1,9 @@ use arrow::{ array::{ Array, BooleanArray, Decimal128Array, Int16Array, Int32Array, Int64Array, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, }, - datatypes::{DataType, Field, Schema}, + datatypes::{DataType, Field, Schema, TimeUnit as ArrowTimeUnit}, record_batch::RecordBatch, }; use polars::{ @@ -60,6 +61,7 @@ pub fn record_batch_to_dataframe(record_batch: RecordBatch) -> Option Series::new(f.name(), data) } + arrow::datatypes::DataType::Utf8 => { let data = col .as_any() @@ -81,6 +83,42 @@ pub fn record_batch_to_dataframe(record_batch: RecordBatch) -> Option // Note: we make this unchecked because if record batch has values that overflow 38 digits, so should the data frame. .into_series() } + arrow::datatypes::DataType::Timestamp(time_unit, _timezone_option) => { + match time_unit { + arrow::datatypes::TimeUnit::Second => { + let data = col + .as_any() + .downcast_ref::() + .map(|array| array.values()) + .unwrap(); + Series::new(f.name(), data) + } + arrow::datatypes::TimeUnit::Millisecond => { + let data = col + .as_any() + .downcast_ref::() + .map(|array| array.values()) + .unwrap(); + Series::new(f.name(), data) + } + arrow::datatypes::TimeUnit::Microsecond => { + let data = col + .as_any() + .downcast_ref::() + .map(|array| array.values()) + .unwrap(); + Series::new(f.name(), data) + } + arrow::datatypes::TimeUnit::Nanosecond => { + let data = col + .as_any() + .downcast_ref::() + .map(|array| array.values()) + .unwrap(); + Series::new(f.name(), data) + } + } + } _ => None?, }) }) @@ -170,6 +208,40 @@ pub fn dataframe_to_record_batch(data: DataFrame) -> Option { DataType::Decimal128(38, 0) } + // NOTE: Polars does not support seconds + polars::datatypes::DataType::Datetime(timeunit, timezone) => { + let col = series.i64().unwrap().cont_slice().unwrap(); + let timezone_arc = timezone.as_ref().map(|tz| Arc::from(tz.as_str())); + let arrow_array: Arc = match timeunit { + polars::datatypes::TimeUnit::Nanoseconds => { + Arc::new(TimestampNanosecondArray::with_timezone_opt( + col.to_vec().into(), + timezone_arc, + )) + } + polars::datatypes::TimeUnit::Microseconds => { + Arc::new(TimestampMicrosecondArray::with_timezone_opt( + col.to_vec().into(), + timezone_arc, + )) + } + polars::datatypes::TimeUnit::Milliseconds => { + Arc::new(TimestampMillisecondArray::with_timezone_opt( + col.to_vec().into(), + timezone_arc, + )) + } + }; + columns.push(arrow_array); + DataType::Timestamp( + match timeunit { + polars::datatypes::TimeUnit::Nanoseconds => ArrowTimeUnit::Nanosecond, + polars::datatypes::TimeUnit::Microseconds => ArrowTimeUnit::Microsecond, + polars::datatypes::TimeUnit::Milliseconds => ArrowTimeUnit::Millisecond, + }, + None, + ) + } _ => return None, }; diff --git a/crates/proof-of-sql/src/base/database/record_batch_utility.rs b/crates/proof-of-sql/src/base/database/record_batch_utility.rs index aa461a64f..7c67c8f7c 100644 --- a/crates/proof-of-sql/src/base/database/record_batch_utility.rs +++ b/crates/proof-of-sql/src/base/database/record_batch_utility.rs @@ -1,3 +1,8 @@ +use crate::base::time::timestamp::{PoSQLTimeUnit, Time}; +use arrow::array::{ + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; use std::sync::Arc; /// Extension trait for Vec to convert it to an Arrow array @@ -18,6 +23,48 @@ impl ToArrow for Vec { } } +impl ToArrow for Vec