From 623df7d14f73d65ae3ba2716752f537088134d9b Mon Sep 17 00:00:00 2001 From: Dustin Ray <40841027+drcapybara@users.noreply.github.com> Date: Wed, 19 Jun 2024 12:13:30 -0700 Subject: [PATCH] feat!: initial support for timestamp (#12) # Rationale for this change Provides the initial typing support for a postgresql-like ```TimeStamp``` type. The proofs ```TimeStampTZ``` is typed over a custom ```TimeUnit``` and ```TimeZone``` type. The choice is made to type out TimeStamp in this way because the arrow::datatypes::TimeStamp has a required TimeUnit field and an Option TimeZone field. Typing out our own TimeStamp with these fields gives us greater control over the arrow type if we want. We can also jut default to seconds and UTC if needed, which would simplify this design a bit. ## Typing Rationale The ```arrow::datatypes::timezone``` is typed over a ```TimeUnit``` and an optional timezone ```Option>```. Thus in our application it makes sense to have a mapping of this metadata: example: ```rust // arrow datatype mapping to our new timestamp type DataType::Timestamp(time_unit, timezone_option) => Ok(ColumnType::TimestampTZ( PoSQLTimeUnit::from(time_unit), PoSQLTimeZone::try_from(timezone_option)?, )), ``` If this becomes burdensome, we could just as easily remove the timezone type and simply default to UTC, and handle any timezone conversion in DML and DDL. We will align with postgresql and store all times as UTC by default. Finally, the ```PoSQLTimeUnit``` type gives us the flexibility to store times in either seconds, milliseconds, nanoseconds, or microseconds for high precision. This type maps directly to ```TimeUnit``` which we alias as ```ArrowTimeUnit``` in this PR. # What changes are included in this PR? ## Typing updates: - [x] Column - [x] OwnedColumn - [x] CommittableColumn - [x] ColumnBounds - [x] Typed TimeZone - [x] Typed TimeUnit - [x] ```impl ArrayRefExt for ArrayRef -> to_curve_25519_scalar & to_column``` - [x] LiteralValue - [x] owned_and_arrow_conversions - [x] ```impl FromIterator for OwnedColumn``` - [x] ```impl DataAccessor for OwnedTableTestAccessor``` - [x] owned_table_utility - [x] test accessor_utility - [x] multi-linear-extension - [x] Scalar trait bounds - [x] compute_dory_commitment - [x] filter_column_by_index - [x] prover_evaluate - [x] sum_aggregate_column_by_index_counts - [x] compare_indexes_by_columns - [x] impl ProvableQueryResult - [x] to_owned_table - [x] trait ProvableResultColumn - [x] make_empty_query_result - [x] record_batch_dataframe_conversion - [x] impl ToArrow for RecordBatch # Are these changes tested? ## Tests: - [x] TimeUnit Conversions - [x] ColumnBounds - [x] ColumnCommitmentMetadata - [x] arrow_array_to_column_conversion - [x] column - [x] owned_table # Split: - lalrpop grammar update and token parsing - timestamp.now() - timestamp.current_time() --- Cargo.toml | 1 + crates/proof-of-sql/Cargo.toml | 1 + .../src/base/commitment/column_bounds.rs | 54 +++- .../commitment/column_commitment_metadata.rs | 154 ++++++++++- .../src/base/commitment/committable_column.rs | 119 +++++++++ .../arrow_array_to_column_conversion.rs | 243 +++++++++++++++++- .../proof-of-sql/src/base/database/column.rs | 41 ++- .../src/base/database/literal_value.rs | 12 +- .../database/owned_and_arrow_conversions.rs | 84 +++++- .../src/base/database/owned_column.rs | 52 +--- .../src/base/database/owned_table_test.rs | 39 +++ .../database/owned_table_test_accessor.rs | 1 + .../owned_table_test_accessor_test.rs | 17 ++ .../src/base/database/owned_table_utility.rs | 37 ++- .../record_batch_dataframe_conversion.rs | 74 +++++- .../src/base/database/record_batch_utility.rs | 47 ++++ .../base/database/test_accessor_utility.rs | 26 +- crates/proof-of-sql/src/base/mod.rs | 2 + .../base/polynomial/multilinear_extension.rs | 4 + crates/proof-of-sql/src/base/time/mod.rs | 2 + .../proof-of-sql/src/base/time/timestamp.rs | 241 +++++++++++++++++ .../dory/dory_commitment_helper_cpu.rs | 3 + .../dory/dory_commitment_helper_gpu.rs | 3 + .../src/sql/ast/dense_filter_util.rs | 5 + .../src/sql/ast/filter_result_expr.rs | 1 + .../proof-of-sql/src/sql/ast/group_by_util.rs | 4 + .../src/sql/proof/provable_query_result.rs | 9 + .../src/sql/proof/provable_result_column.rs | 2 + .../src/sql/proof/verifiable_query_result.rs | 1 + 29 files changed, 1219 insertions(+), 60 deletions(-) create mode 100644 crates/proof-of-sql/src/base/time/mod.rs create mode 100644 crates/proof-of-sql/src/base/time/timestamp.rs diff --git a/Cargo.toml b/Cargo.toml index 952cc0833..025249708 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ bytemuck = {version = "1.14.2" } byte-slice-cast = { version = "1.2.1" } clap = { version = "4.5.4" } criterion = { version = "0.5.1" } +chrono-tz = {version = "0.9.0", features = ["serde"]} curve25519-dalek = { version = "4", features = ["rand_core"] } derive_more = { version = "0.99" } dyn_partial_eq = { version = "0.1.2" } diff --git a/crates/proof-of-sql/Cargo.toml b/crates/proof-of-sql/Cargo.toml index 6535d7f2d..d6a0736e9 100644 --- a/crates/proof-of-sql/Cargo.toml +++ b/crates/proof-of-sql/Cargo.toml @@ -30,6 +30,7 @@ bumpalo = { workspace = true, features = ["collections"] } bytemuck = { workspace = true } byte-slice-cast = { workspace = true } curve25519-dalek = { workspace = true, features = ["serde"] } +chrono-tz = {workspace = true, features = ["serde"]} derive_more = { workspace = true } dyn_partial_eq = { workspace = true } hashbrown = { workspace = true } diff --git a/crates/proof-of-sql/src/base/commitment/column_bounds.rs b/crates/proof-of-sql/src/base/commitment/column_bounds.rs index 055878cce..22106c930 100644 --- a/crates/proof-of-sql/src/base/commitment/column_bounds.rs +++ b/crates/proof-of-sql/src/base/commitment/column_bounds.rs @@ -207,6 +207,8 @@ pub enum ColumnBounds { BigInt(Bounds), /// The bounds of an Int128 column. Int128(Bounds), + /// The bounds of a Timestamp column. + TimestampTZ(Bounds), } impl ColumnBounds { @@ -219,6 +221,9 @@ impl ColumnBounds { CommittableColumn::Int(ints) => ColumnBounds::Int(Bounds::from_iter(*ints)), CommittableColumn::BigInt(ints) => ColumnBounds::BigInt(Bounds::from_iter(*ints)), CommittableColumn::Int128(ints) => ColumnBounds::Int128(Bounds::from_iter(*ints)), + CommittableColumn::TimestampTZ(_, _, times) => { + ColumnBounds::TimestampTZ(Bounds::from_iter(*times)) + } CommittableColumn::Boolean(_) | CommittableColumn::Decimal75(_, _, _) | CommittableColumn::Scalar(_) @@ -241,6 +246,9 @@ impl ColumnBounds { (ColumnBounds::BigInt(bounds_a), ColumnBounds::BigInt(bounds_b)) => { Ok(ColumnBounds::BigInt(bounds_a.union(bounds_b))) } + (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => { + Ok(ColumnBounds::TimestampTZ(bounds_a.union(bounds_b))) + } (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => { Ok(ColumnBounds::Int128(bounds_a.union(bounds_b))) } @@ -269,7 +277,9 @@ impl ColumnBounds { (ColumnBounds::Int128(bounds_a), ColumnBounds::Int128(bounds_b)) => { Ok(ColumnBounds::Int128(bounds_a.difference(bounds_b))) } - + (ColumnBounds::TimestampTZ(bounds_a), ColumnBounds::TimestampTZ(bounds_b)) => { + Ok(ColumnBounds::TimestampTZ(bounds_a.difference(bounds_b))) + } (_, _) => Err(ColumnBoundsMismatch(Box::new(self), Box::new(other))), } } @@ -278,7 +288,12 @@ impl ColumnBounds { #[cfg(test)] mod tests { use super::*; - use crate::base::{database::OwnedColumn, math::decimal::Precision, scalar::Curve25519Scalar}; + use crate::base::{ + database::OwnedColumn, + math::decimal::Precision, + scalar::Curve25519Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, + }; use itertools::Itertools; #[test] @@ -518,8 +533,19 @@ mod tests { ); let committable_decimal75_column = CommittableColumn::from(&decimal75_column); let decimal75_column_bounds = ColumnBounds::from_column(&committable_decimal75_column); - assert_eq!(decimal75_column_bounds, ColumnBounds::NoOrder); + + let timestamp_column = OwnedColumn::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + vec![1_i64, 2, 3, 4], + ); + let committable_timestamp_column = CommittableColumn::from(×tamp_column); + let timestamp_column_bounds = ColumnBounds::from_column(&committable_timestamp_column); + assert_eq!( + timestamp_column_bounds, + ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 })) + ); } #[test] @@ -561,6 +587,14 @@ mod tests { int128_a.try_union(int128_b).unwrap(), ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 6 })) ); + + let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 3 })); + let timestamp_b = + ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 4, max: 6 })); + assert_eq!( + timestamp_a.try_union(timestamp_b).unwrap(), + ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 6 })) + ); } #[test] @@ -570,6 +604,7 @@ mod tests { let int = ColumnBounds::Int(Bounds::Sharp(BoundsInner { min: -10, max: 10 })); let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 })); let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 })); + let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 })); let bounds = [ (no_order, "NoOrder"), @@ -577,6 +612,7 @@ mod tests { (int, "Int"), (bigint, "BigInt"), (int128, "Int128"), + (timestamp, "Timestamp"), ]; for ((bound_a, name_a), (bound_b, name_b)) in bounds.iter().tuple_combinations() { @@ -618,6 +654,13 @@ mod tests { int128_a.try_difference(int128_b).unwrap(), ColumnBounds::Int128(Bounds::Bounded(BoundsInner { min: 1, max: 4 })) ); + + let timestamp_a = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 1, max: 4 })); + let timestamp_b = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 3, max: 6 })); + assert_eq!( + timestamp_a.try_difference(timestamp_b).unwrap(), + ColumnBounds::TimestampTZ(Bounds::Bounded(BoundsInner { min: 1, max: 4 })) + ); } #[test] @@ -625,6 +668,8 @@ mod tests { let no_order = ColumnBounds::NoOrder; let bigint = ColumnBounds::BigInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 })); let int128 = ColumnBounds::Int128(Bounds::Sharp(BoundsInner { min: 4, max: 6 })); + let timestamp = ColumnBounds::TimestampTZ(Bounds::Sharp(BoundsInner { min: 4, max: 6 })); + let smallint = ColumnBounds::SmallInt(Bounds::Sharp(BoundsInner { min: 1, max: 3 })); assert!(no_order.try_difference(bigint).is_err()); assert!(bigint.try_difference(no_order).is_err()); @@ -634,5 +679,8 @@ mod tests { assert!(bigint.try_difference(int128).is_err()); assert!(int128.try_difference(bigint).is_err()); + + assert!(smallint.try_difference(timestamp).is_err()); + assert!(timestamp.try_difference(smallint).is_err()); } } diff --git a/crates/proof-of-sql/src/base/commitment/column_commitment_metadata.rs b/crates/proof-of-sql/src/base/commitment/column_commitment_metadata.rs index 6210c96d3..9d3fce8cd 100644 --- a/crates/proof-of-sql/src/base/commitment/column_commitment_metadata.rs +++ b/crates/proof-of-sql/src/base/commitment/column_commitment_metadata.rs @@ -40,6 +40,7 @@ impl ColumnCommitmentMetadata { | (ColumnType::Int, ColumnBounds::Int(_)) | (ColumnType::BigInt, ColumnBounds::BigInt(_)) | (ColumnType::Int128, ColumnBounds::Int128(_)) + | (ColumnType::TimestampTZ(_, _), ColumnBounds::TimestampTZ(_)) | ( ColumnType::Boolean | ColumnType::VarChar @@ -72,6 +73,10 @@ impl ColumnCommitmentMetadata { BoundsInner::try_new(i64::MIN, i64::MAX) .expect("i64::MIN and i64::MAX are valid bounds for BigInt"), )), + ColumnType::TimestampTZ(_, _) => ColumnBounds::TimestampTZ(super::Bounds::Bounded( + BoundsInner::try_new(i64::MIN, i64::MAX) + .expect("i64::MIN and i64::MAX are valid bounds for TimeStamp"), + )), ColumnType::Int128 => ColumnBounds::Int128(super::Bounds::Bounded( BoundsInner::try_new(i128::MIN, i128::MAX) .expect("i128::MIN and i128::MAX are valid bounds for Int128"), @@ -160,8 +165,11 @@ impl ColumnCommitmentMetadata { mod tests { use super::*; use crate::base::{ - commitment::column_bounds::Bounds, database::OwnedColumn, math::decimal::Precision, + commitment::column_bounds::Bounds, + database::OwnedColumn, + math::decimal::Precision, scalar::Curve25519Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; #[test] @@ -219,6 +227,18 @@ mod tests { } ); + assert_eq!( + ColumnCommitmentMetadata::try_new( + ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC), + ColumnBounds::TimestampTZ(Bounds::Empty), + ) + .unwrap(), + ColumnCommitmentMetadata { + column_type: ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC), + bounds: ColumnBounds::TimestampTZ(Bounds::Empty), + } + ); + assert_eq!( ColumnCommitmentMetadata::try_new( ColumnType::Int128, @@ -349,6 +369,26 @@ mod tests { ); assert_eq!(decimal_metadata.bounds(), &ColumnBounds::NoOrder); + let timestamp_column: OwnedColumn = + OwnedColumn::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [1i64, 2, 3, 4, 5].to_vec(), + ); + let committable_timestamp_column = CommittableColumn::from(×tamp_column); + let timestamp_metadata = + ColumnCommitmentMetadata::from_column(&committable_timestamp_column); + assert_eq!( + timestamp_metadata.column_type(), + &ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC) + ); + if let ColumnBounds::TimestampTZ(Bounds::Sharp(bounds)) = timestamp_metadata.bounds() { + assert_eq!(bounds.min(), &1); + assert_eq!(bounds.max(), &5); + } else { + panic!("Bounds constructed from nonempty TimestampTZ column should be ColumnBounds::BigInt(Bounds::Sharp(_))"); + } + let varchar_column = OwnedColumn::::VarChar( ["Lorem", "ipsum", "dolor", "sit", "amet"] .map(String::from) @@ -484,6 +524,80 @@ mod tests { bigint_metadata_a.try_union(bigint_metadata_b).unwrap(), bigint_metadata_c ); + + // Ordered case for TimestampTZ + // Example Unix epoch times + let times = [ + 1_625_072_400, + 1_625_076_000, + 1_625_079_600, + 1_625_072_400, + 1_625_065_000, + ]; + let timezone = PoSQLTimeZone::UTC; + let timeunit = PoSQLTimeUnit::Second; + let timestamp_column_a = CommittableColumn::TimestampTZ(timeunit, timezone, ×[..2]); + let timestamp_metadata_a = ColumnCommitmentMetadata::from_column(×tamp_column_a); + let timestamp_column_b = CommittableColumn::TimestampTZ(timeunit, timezone, ×[2..]); + let timestamp_metadata_b = ColumnCommitmentMetadata::from_column(×tamp_column_b); + let timestamp_column_c = CommittableColumn::TimestampTZ(timeunit, timezone, ×); + let timestamp_metadata_c = ColumnCommitmentMetadata::from_column(×tamp_column_c); + assert_eq!( + timestamp_metadata_a + .try_union(timestamp_metadata_b) + .unwrap(), + timestamp_metadata_c + ); + } + + #[test] + fn we_can_difference_timestamp_tz_matching_metadata() { + // Ordered case + let times = [ + 1_625_072_400, + 1_625_076_000, + 1_625_079_600, + 1_625_072_400, + 1_625_065_000, + ]; + let timezone = PoSQLTimeZone::UTC; + let timeunit = PoSQLTimeUnit::Second; + + let timestamp_column_a = CommittableColumn::TimestampTZ(timeunit, timezone, ×[..2]); + let timestamp_metadata_a = ColumnCommitmentMetadata::from_column(×tamp_column_a); + let timestamp_column_b = CommittableColumn::TimestampTZ(timeunit, timezone, ×); + let timestamp_metadata_b = ColumnCommitmentMetadata::from_column(×tamp_column_b); + + let b_difference_a = timestamp_metadata_b + .try_difference(timestamp_metadata_a) + .unwrap(); + assert_eq!( + b_difference_a.column_type, + ColumnType::TimestampTZ(timeunit, timezone) + ); + if let ColumnBounds::TimestampTZ(Bounds::Bounded(bounds)) = b_difference_a.bounds { + assert_eq!(bounds.min(), &1_625_065_000); + assert_eq!(bounds.max(), &1_625_079_600); + } else { + panic!("difference of overlapping bounds should be Bounded"); + } + + let timestamp_column_empty = CommittableColumn::TimestampTZ(timeunit, timezone, &[]); + let timestamp_metadata_empty = + ColumnCommitmentMetadata::from_column(×tamp_column_empty); + + assert_eq!( + timestamp_metadata_b + .try_difference(timestamp_metadata_empty) + .unwrap(), + timestamp_metadata_b + ); + assert_eq!( + timestamp_metadata_empty + .try_difference(timestamp_metadata_b) + .unwrap(), + timestamp_metadata_empty + ); } #[test] @@ -741,5 +855,43 @@ mod tests { assert!(different_decimal75_metadata .try_union(decimal75_metadata) .is_err()); + + let timestamp_tz_metadata_a = ColumnCommitmentMetadata { + column_type: ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC), + bounds: ColumnBounds::TimestampTZ(Bounds::Empty), + }; + + let timestamp_tz_metadata_b = ColumnCommitmentMetadata { + column_type: ColumnType::TimestampTZ(PoSQLTimeUnit::Millisecond, PoSQLTimeZone::UTC), + bounds: ColumnBounds::TimestampTZ(Bounds::Empty), + }; + + // Tests for union operations + assert!(timestamp_tz_metadata_a.try_union(varchar_metadata).is_err()); + assert!(varchar_metadata.try_union(timestamp_tz_metadata_a).is_err()); + + // Tests for difference operations + assert!(timestamp_tz_metadata_a + .try_difference(scalar_metadata) + .is_err()); + assert!(scalar_metadata + .try_difference(timestamp_tz_metadata_a) + .is_err()); + + // Tests for different time units within the same type + assert!(timestamp_tz_metadata_a + .try_union(timestamp_tz_metadata_b) + .is_err()); + assert!(timestamp_tz_metadata_b + .try_union(timestamp_tz_metadata_a) + .is_err()); + + // Difference with different time units + assert!(timestamp_tz_metadata_a + .try_difference(timestamp_tz_metadata_b) + .is_err()); + assert!(timestamp_tz_metadata_b + .try_difference(timestamp_tz_metadata_a) + .is_err()); } } diff --git a/crates/proof-of-sql/src/base/commitment/committable_column.rs b/crates/proof-of-sql/src/base/commitment/committable_column.rs index bf83bcfd7..546f303f5 100644 --- a/crates/proof-of-sql/src/base/commitment/committable_column.rs +++ b/crates/proof-of-sql/src/base/commitment/committable_column.rs @@ -3,6 +3,7 @@ use crate::base::{ math::decimal::Precision, ref_into::RefInto, scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; #[cfg(feature = "blitzar")] use blitzar::sequence::Sequence; @@ -37,6 +38,8 @@ pub enum CommittableColumn<'a> { Scalar(Vec<[u64; 4]>), /// Column of limbs for committing to scalars, hashed from a VarChar column. VarChar(Vec<[u64; 4]>), + /// Borrowed Timestamp column with Timezone, mapped to `i64`. + TimestampTZ(PoSQLTimeUnit, PoSQLTimeZone, &'a [i64]), } impl<'a> CommittableColumn<'a> { @@ -51,6 +54,7 @@ impl<'a> CommittableColumn<'a> { CommittableColumn::Scalar(col) => col.len(), CommittableColumn::VarChar(col) => col.len(), CommittableColumn::Boolean(col) => col.len(), + CommittableColumn::TimestampTZ(_, _, col) => col.len(), } } @@ -78,6 +82,7 @@ impl<'a> From<&CommittableColumn<'a>> for ColumnType { CommittableColumn::Scalar(_) => ColumnType::Scalar, CommittableColumn::VarChar(_) => ColumnType::VarChar, CommittableColumn::Boolean(_) => ColumnType::Boolean, + CommittableColumn::TimestampTZ(tu, tz, _) => ColumnType::TimestampTZ(*tu, *tz), } } } @@ -99,6 +104,7 @@ impl<'a, S: Scalar> From<&Column<'a, S>> for CommittableColumn<'a> { let as_limbs: Vec<_> = scalars.iter().map(RefInto::<[u64; 4]>::ref_into).collect(); CommittableColumn::VarChar(as_limbs) } + Column::TimestampTZ(tu, tz, times) => CommittableColumn::TimestampTZ(*tu, *tz, times), } } } @@ -128,6 +134,9 @@ impl<'a, S: Scalar> From<&'a OwnedColumn> for CommittableColumn<'a> { .map(Into::<[u64; 4]>::into) .collect(), ), + OwnedColumn::TimestampTZ(tu, tz, times) => { + CommittableColumn::TimestampTZ(*tu, *tz, times as &[_]) + } } } } @@ -142,11 +151,13 @@ impl<'a> From<&'a [i32]> for CommittableColumn<'a> { CommittableColumn::Int(value) } } + impl<'a> From<&'a [i64]> for CommittableColumn<'a> { fn from(value: &'a [i64]) -> Self { CommittableColumn::BigInt(value) } } + impl<'a> From<&'a [i128]> for CommittableColumn<'a> { fn from(value: &'a [i128]) -> Self { CommittableColumn::Int128(value) @@ -175,6 +186,7 @@ impl<'a, 'b> From<&'a CommittableColumn<'b>> for Sequence<'a> { CommittableColumn::Scalar(limbs) => Sequence::from(limbs), CommittableColumn::VarChar(limbs) => Sequence::from(limbs), CommittableColumn::Boolean(bools) => Sequence::from(*bools), + CommittableColumn::TimestampTZ(_, _, times) => Sequence::from(*times), } } } @@ -208,6 +220,31 @@ mod tests { assert_eq!(res_committable_column, test_committable_column) } + #[test] + fn we_can_get_type_and_length_of_timestamp_column() { + // empty case + let smallint_committable_column = + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]); + assert_eq!(smallint_committable_column.len(), 0); + assert!(smallint_committable_column.is_empty()); + assert_eq!( + smallint_committable_column.column_type(), + ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC) + ); + + let smallint_committable_column = CommittableColumn::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + &[12, 34, 56], + ); + assert_eq!(smallint_committable_column.len(), 3); + assert!(!smallint_committable_column.is_empty()); + assert_eq!( + smallint_committable_column.column_type(), + ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC) + ); + } + #[test] fn we_can_get_type_and_length_of_smallint_column() { // empty case @@ -347,6 +384,34 @@ mod tests { assert_eq!(bool_committable_column.column_type(), ColumnType::Boolean); } + #[test] + fn we_can_convert_from_borrowing_timestamp_column() { + // empty case + let from_borrowed_column = + CommittableColumn::from(&Column::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + &[], + )); + assert_eq!( + from_borrowed_column, + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + + // non-empty case + let timestamps = [1625072400, 1625076000, 1625083200]; + let from_borrowed_column = + CommittableColumn::from(&Column::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + ×tamps, + )); + assert_eq!( + from_borrowed_column, + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, ×tamps) + ); + } + #[test] fn we_can_convert_from_borrowing_bigint_column() { // empty case @@ -501,6 +566,34 @@ mod tests { ); } + #[test] + fn we_can_convert_from_owned_timestamp_column() { + // empty case + let owned_column = OwnedColumn::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + Vec::new(), + ); + let from_owned_column = CommittableColumn::from(&owned_column); + assert_eq!( + from_owned_column, + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + + // non-empty case + let timestamps = vec![1625072400, 1625076000, 1625083200]; + let owned_column = OwnedColumn::::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + timestamps.clone(), + ); + let from_owned_column = CommittableColumn::from(&owned_column); + assert_eq!( + from_owned_column, + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, ×tamps) + ); + } + #[test] fn we_can_convert_from_owned_int_column() { // empty case @@ -779,4 +872,30 @@ mod tests { ); assert_eq!(commitment_buffer[0], commitment_buffer[1]); } + + #[test] + fn we_can_commit_to_timestamp_column_through_committable_column() { + // Empty case + let committable_column = + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]); + let sequence = Sequence::from(&committable_column); + let mut commitment_buffer = [CompressedRistretto::default()]; + compute_curve25519_commitments(&mut commitment_buffer, &[sequence], 0); + assert_eq!(commitment_buffer[0], CompressedRistretto::default()); + + // Non-empty case + let timestamps = [1625072400, 1625076000, 1625083200]; + let committable_column = + CommittableColumn::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, ×tamps); + + let sequence_actual = Sequence::from(&committable_column); + let sequence_expected = Sequence::from(timestamps.as_slice()); + let mut commitment_buffer = [CompressedRistretto::default(); 2]; + compute_curve25519_commitments( + &mut commitment_buffer, + &[sequence_actual, sequence_expected], + 0, + ); + assert_eq!(commitment_buffer[0], commitment_buffer[1]); + } } diff --git a/crates/proof-of-sql/src/base/database/arrow_array_to_column_conversion.rs b/crates/proof-of-sql/src/base/database/arrow_array_to_column_conversion.rs index 2c339d8fe..91bb4b6c1 100644 --- a/crates/proof-of-sql/src/base/database/arrow_array_to_column_conversion.rs +++ b/crates/proof-of-sql/src/base/database/arrow_array_to_column_conversion.rs @@ -3,16 +3,18 @@ use crate::{ base::{ database::Column, math::decimal::Precision, - scalar::{Curve25519Scalar, Scalar}, + scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }, sql::parse::ConversionError, }; use arrow::{ array::{ Array, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Int16Array, Int32Array, - Int64Array, StringArray, + Int64Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, }, - datatypes::{i256, DataType}, + datatypes::{i256, DataType, TimeUnit as ArrowTimeUnit}, }; use bumpalo::Bump; use std::ops::Range; @@ -36,6 +38,9 @@ pub enum ArrowArrayToColumnConversionError { /// Variant for conversion errors #[error("conversion error: {0}")] ConversionError(#[from] ConversionError), + /// Variant for timezone conversion errors, i.e. invalid timezone + #[error("Timezone conversion failed: {0}")] + TimezoneConversionError(String), } /// This trait is used to provide utility functions to convert ArrayRefs into proof types (Column, Scalars, etc.) @@ -48,7 +53,7 @@ pub trait ArrayRefExt { #[cfg(feature = "blitzar")] fn to_curve25519_scalars( &self, - ) -> Result, ArrowArrayToColumnConversionError>; + ) -> Result, ArrowArrayToColumnConversionError>; /// Convert an ArrayRef into a Proof of SQL Column type /// @@ -76,7 +81,7 @@ impl ArrayRefExt for ArrayRef { #[cfg(feature = "blitzar")] fn to_curve25519_scalars( &self, - ) -> Result, ArrowArrayToColumnConversionError> { + ) -> Result, ArrowArrayToColumnConversionError> { if self.null_count() != 0 { return Err(ArrowArrayToColumnConversionError::ArrayContainsNulls); } @@ -131,6 +136,24 @@ impl ArrayRefExt for ArrayRef { }) .collect() }), + DataType::Timestamp(time_unit, _) => match time_unit { + ArrowTimeUnit::Second => self + .as_any() + .downcast_ref::() + .map(|array| array.values().iter().map(|v| Ok((*v).into())).collect()), + ArrowTimeUnit::Millisecond => self + .as_any() + .downcast_ref::() + .map(|array| array.values().iter().map(|v| Ok((*v).into())).collect()), + ArrowTimeUnit::Microsecond => self + .as_any() + .downcast_ref::() + .map(|array| array.values().iter().map(|v| Ok((*v).into())).collect()), + ArrowTimeUnit::Nanosecond => self + .as_any() + .downcast_ref::() + .map(|array| array.values().iter().map(|v| Ok((*v).into())).collect()), + }, _ => None, }; @@ -251,6 +274,61 @@ impl ArrayRefExt for ArrayRef { )) } } + // Handle all possible TimeStamp TimeUnit instances + DataType::Timestamp(time_unit, tz) => match time_unit { + ArrowTimeUnit::Second => { + if let Some(array) = self.as_any().downcast_ref::() { + Ok(Column::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::try_from(tz.clone())?, + &array.values()[range.start..range.end], + )) + } else { + Err(ArrowArrayToColumnConversionError::UnsupportedType( + self.data_type().clone(), + )) + } + } + ArrowTimeUnit::Millisecond => { + if let Some(array) = self.as_any().downcast_ref::() { + Ok(Column::TimestampTZ( + PoSQLTimeUnit::Millisecond, + PoSQLTimeZone::try_from(tz.clone())?, + &array.values()[range.start..range.end], + )) + } else { + Err(ArrowArrayToColumnConversionError::UnsupportedType( + self.data_type().clone(), + )) + } + } + ArrowTimeUnit::Microsecond => { + if let Some(array) = self.as_any().downcast_ref::() { + Ok(Column::TimestampTZ( + PoSQLTimeUnit::Microsecond, + PoSQLTimeZone::try_from(tz.clone())?, + &array.values()[range.start..range.end], + )) + } else { + Err(ArrowArrayToColumnConversionError::UnsupportedType( + self.data_type().clone(), + )) + } + } + ArrowTimeUnit::Nanosecond => { + if let Some(array) = self.as_any().downcast_ref::() { + Ok(Column::TimestampTZ( + PoSQLTimeUnit::Nanosecond, + PoSQLTimeZone::try_from(tz.clone())?, + &array.values()[range.start..range.end], + )) + } else { + Err(ArrowArrayToColumnConversionError::UnsupportedType( + self.data_type().clone(), + )) + } + } + }, DataType::Utf8 => { if let Some(array) = self.as_any().downcast_ref::() { let vals = alloc @@ -283,10 +361,92 @@ impl ArrayRefExt for ArrayRef { mod tests { use super::*; - use crate::proof_primitive::dory::DoryScalar; + use crate::{base::scalar::Curve25519Scalar, proof_primitive::dory::DoryScalar}; use arrow::array::Decimal256Builder; use std::{str::FromStr, sync::Arc}; + #[test] + fn we_can_convert_timestamp_array_normal_range() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000, 1625083200]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + + let result = array.to_column::(&alloc, &(1..3), None); + assert_eq!( + result.unwrap(), + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &data[1..3]) + ); + } + + #[test] + fn we_can_build_an_empty_column_from_an_empty_range_timestamp() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.into(), + Some("UTC"), + )); + + let result = array + .to_column::(&alloc, &(2..2), None) + .unwrap(); + assert_eq!( + result, + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + } + + #[test] + fn we_can_convert_timestamp_array_empty_range() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000, 1625083200]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.into(), + Some("UTC"), + )); + + let result = array.to_column::(&alloc, &(1..1), None); + assert_eq!( + result.unwrap(), + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + } + + #[test] + fn we_cannot_convert_timestamp_array_oob_range() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000, 1625083200]; + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.into(), + Some("UTC"), + )); + + let result = array.to_column::(&alloc, &(3..5), None); + assert_eq!( + result, + Err(ArrowArrayToColumnConversionError::IndexOutOfBounds(3, 5)) + ); + } + + #[test] + fn we_can_convert_timestamp_array_with_nulls() { + let alloc = Bump::new(); + let data = vec![Some(1625072400), None, Some(1625083200)]; + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.into(), + Some("UTC"), + )); + + let result = array.to_column::(&alloc, &(0..3), None); + assert!(matches!( + result, + Err(ArrowArrayToColumnConversionError::ArrayContainsNulls) + )); + } + #[test] fn we_cannot_convert_utf8_array_oob_range() { let alloc = Bump::new(); @@ -830,6 +990,24 @@ mod tests { ); } + #[test] + fn we_can_convert_valid_timestamp_array_refs_into_valid_columns() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + + let result = array + .to_column::(&alloc, &(0..2), None) + .unwrap(); + assert_eq!( + result, + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &data[..]) + ); + } + #[test] fn we_can_convert_valid_boolean_array_refs_into_valid_columns_using_ranges_smaller_than_arrays() { @@ -873,6 +1051,25 @@ mod tests { ); } + #[test] + fn we_can_convert_valid_timestamp_array_refs_into_valid_columns_using_ranges_smaller_than_arrays( + ) { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000, 1625083200]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + + // Test using a range smaller than the array size + assert_eq!( + array + .to_column::(&alloc, &(1..3), None) + .unwrap(), + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &data[1..3]) + ); + } + #[test] fn we_can_convert_valid_string_array_refs_into_valid_columns_using_ranges_smaller_than_arrays() { @@ -914,6 +1111,23 @@ mod tests { assert_eq!(result, Column::VarChar((&[], &[]))); } + #[test] + fn we_can_convert_valid_timestamp_array_refs_into_valid_columns_using_ranges_with_zero_size() { + let alloc = Bump::new(); + let data = vec![1625072400, 1625076000]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + let result = array + .to_column::(&alloc, &(0..0), None) + .unwrap(); + assert_eq!( + result, + Column::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC, &[]) + ); + } + #[test] fn we_can_convert_valid_boolean_array_refs_into_valid_vec_scalars() { let data = vec![false, true]; @@ -927,6 +1141,23 @@ mod tests { ); } + #[test] + fn we_can_convert_valid_timestamp_array_refs_into_valid_vec_scalars() { + let data = vec![1625072400, 1625076000]; // Example Unix timestamps + let array: ArrayRef = Arc::new(TimestampSecondArray::with_timezone_opt( + data.clone().into(), + Some("UTC"), + )); + + assert_eq!( + array.to_curve25519_scalars(), + Ok(data + .iter() + .map(|&v| Curve25519Scalar::from(v)) + .collect::>()) + ); + } + #[test] fn we_can_convert_valid_integer_array_refs_into_valid_vec_scalars() { let data = vec![1, -3]; diff --git a/crates/proof-of-sql/src/base/database/column.rs b/crates/proof-of-sql/src/base/database/column.rs index b78ec8873..5bb0fa19d 100644 --- a/crates/proof-of-sql/src/base/database/column.rs +++ b/crates/proof-of-sql/src/base/database/column.rs @@ -2,12 +2,14 @@ use super::{LiteralValue, TableRef}; use crate::base::{ math::decimal::{scale_scalar, Precision}, scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, TimeUnit as ArrowTimeUnit}; use bumpalo::Bump; use proof_of_sql_parser::Identifier; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde::{Deserialize, Serialize}; +use std::sync::Arc; /// Represents a read-only view of a column in an in-memory, /// column-oriented database. @@ -37,7 +39,13 @@ pub enum Column<'a, S: Scalar> { /// - the first element maps to the str values. /// - the second element maps to the str hashes (see [crate::base::scalar::Scalar]). VarChar((&'a [&'a str], &'a [S])), + /// Timestamp columns with timezone + /// - the first element maps to the stored [`TimeUnit`] + /// - the second element maps to a timezone + /// - the third element maps to columns of timeunits since unix epoch + TimestampTZ(PoSQLTimeUnit, PoSQLTimeZone, &'a [i64]), } + impl<'a, S: Scalar> Column<'a, S> { /// Provides the column type associated with the column pub fn column_type(&self) -> ColumnType { @@ -50,6 +58,9 @@ impl<'a, S: Scalar> Column<'a, S> { Self::Int128(_) => ColumnType::Int128, Self::Scalar(_) => ColumnType::Scalar, Self::Decimal75(precision, scale, _) => ColumnType::Decimal75(*precision, *scale), + Self::TimestampTZ(time_unit, timezone, _) => { + ColumnType::TimestampTZ(*time_unit, *timezone) + } } } /// Returns the length of the column. @@ -66,6 +77,7 @@ impl<'a, S: Scalar> Column<'a, S> { Self::Int128(col) => col.len(), Self::Scalar(col) => col.len(), Self::Decimal75(_, _, col) => col.len(), + Self::TimestampTZ(_, _, col) => col.len(), } } /// Returns `true` if the column has no elements. @@ -101,6 +113,9 @@ impl<'a, S: Scalar> Column<'a, S> { *scale, alloc.alloc_slice_fill_copy(length, *value), ), + LiteralValue::TimeStampTZ(tu, tz, value) => { + Column::TimestampTZ(*tu, *tz, alloc.alloc_slice_fill_copy(length, *value)) + } LiteralValue::VarChar((string, scalar)) => Column::VarChar(( alloc.alloc_slice_fill_with(length, |_| alloc.alloc_str(string) as &str), alloc.alloc_slice_fill_copy(length, *scalar), @@ -153,6 +168,10 @@ impl<'a, S: Scalar> Column<'a, S> { .par_iter() .map(|s| *s * scale_factor) .collect::>(), + Self::TimestampTZ(_, _, col) => col + .par_iter() + .map(|i| S::from(i) * scale_factor) + .collect::>(), } } } @@ -194,6 +213,9 @@ pub enum ColumnType { /// Mapped to i256 #[serde(rename = "Decimal75", alias = "DECIMAL75", alias = "decimal75")] Decimal75(Precision, i8), + /// Mapped to i64 + #[serde(alias = "TIMESTAMP", alias = "timestamp")] + TimestampTZ(PoSQLTimeUnit, PoSQLTimeZone), } impl ColumnType { @@ -224,6 +246,7 @@ impl ColumnType { Self::SmallInt => Some(5_u8), Self::Int => Some(10_u8), Self::BigInt => Some(19_u8), + Self::TimestampTZ(_, _) => Some(19_u8), Self::Int128 => Some(39_u8), Self::Decimal75(precision, _) => Some(precision.value()), // Scalars are not in database & are only used for typeless comparisons for testing so we return 0 @@ -256,6 +279,9 @@ impl From<&ColumnType> for DataType { } ColumnType::VarChar => DataType::Utf8, ColumnType::Scalar => unimplemented!("Cannot convert Scalar type to arrow type"), + ColumnType::TimestampTZ(timeunit, timezone) => { + DataType::Timestamp(ArrowTimeUnit::from(*timeunit), Some(Arc::from(timezone))) + } } } } @@ -274,6 +300,10 @@ impl TryFrom for ColumnType { DataType::Decimal256(precision, scale) if precision <= 75 => { Ok(ColumnType::Decimal75(Precision::new(precision)?, scale)) } + DataType::Timestamp(time_unit, timezone_option) => Ok(ColumnType::TimestampTZ( + PoSQLTimeUnit::from(time_unit), + PoSQLTimeZone::try_from(timezone_option)?, + )), DataType::Utf8 => Ok(ColumnType::VarChar), _ => Err(format!("Unsupported arrow data type {:?}", data_type)), } @@ -298,6 +328,11 @@ impl std::fmt::Display for ColumnType { } ColumnType::VarChar => write!(f, "VARCHAR"), ColumnType::Scalar => write!(f, "SCALAR"), + ColumnType::TimestampTZ(timeunit, timezone) => write!( + f, + "TIMESTAMP(TIMEUNIT: {:?}, TIMEZONE: {timeunit})", + timezone + ), } } } @@ -381,6 +416,10 @@ mod tests { #[test] fn column_type_serializes_to_string() { + let column_type = ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC); + let serialized = serde_json::to_string(&column_type).unwrap(); + assert_eq!(serialized, r#"{"TimestampTZ":["Second","UTC"]}"#); + let column_type = ColumnType::Boolean; let serialized = serde_json::to_string(&column_type).unwrap(); assert_eq!(serialized, r#""Boolean""#); diff --git a/crates/proof-of-sql/src/base/database/literal_value.rs b/crates/proof-of-sql/src/base/database/literal_value.rs index 3d9345685..76bc41865 100644 --- a/crates/proof-of-sql/src/base/database/literal_value.rs +++ b/crates/proof-of-sql/src/base/database/literal_value.rs @@ -1,4 +1,9 @@ -use crate::base::{database::ColumnType, math::decimal::Precision, scalar::Scalar}; +use crate::base::{ + database::ColumnType, + math::decimal::Precision, + scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, +}; use serde::{Deserialize, Serialize}; /// Represents a literal value. @@ -29,6 +34,9 @@ pub enum LiteralValue { Decimal75(Precision, i8, S), /// Scalar literals Scalar(S), + /// TimeStamp defined over a unit (s, ms, ns, etc) and timezone with backing store + /// mapped to i64, which is time units since unix epoch + TimeStampTZ(PoSQLTimeUnit, PoSQLTimeZone, i64), } impl LiteralValue { @@ -43,6 +51,7 @@ impl LiteralValue { Self::Int128(_) => ColumnType::Int128, Self::Scalar(_) => ColumnType::Scalar, Self::Decimal75(precision, scale, _) => ColumnType::Decimal75(*precision, *scale), + Self::TimeStampTZ(tu, tz, _) => ColumnType::TimestampTZ(*tu, *tz), } } @@ -57,6 +66,7 @@ impl LiteralValue { Self::Int128(i) => i.into(), Self::Decimal75(_, _, s) => *s, Self::Scalar(scalar) => *scalar, + Self::TimeStampTZ(_, _, time) => time.into(), } } } diff --git a/crates/proof-of-sql/src/base/database/owned_and_arrow_conversions.rs b/crates/proof-of-sql/src/base/database/owned_and_arrow_conversions.rs index e9f522a2b..3e551b89e 100644 --- a/crates/proof-of-sql/src/base/database/owned_and_arrow_conversions.rs +++ b/crates/proof-of-sql/src/base/database/owned_and_arrow_conversions.rs @@ -20,13 +20,15 @@ use crate::base::{ }, math::decimal::Precision, scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; use arrow::{ array::{ ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Int16Array, Int32Array, - Int64Array, StringArray, + Int64Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, }, - datatypes::{i256, DataType, Schema, SchemaRef}, + datatypes::{i256, DataType, Schema, SchemaRef, TimeUnit as ArrowTimeUnit}, error::ArrowError, record_batch::RecordBatch, }; @@ -54,6 +56,12 @@ pub enum OwnedArrowConversionError { /// This error occurs when trying to convert from an Arrow array with nulls. #[error("null values are not supported in OwnedColumn yet")] NullNotSupportedYet, + /// This error occurs when trying to convert from an unsupported timestamp unit. + #[error("unsupported timestamp unit: {0}")] + UnsupportedTimestampUnit(String), + /// This error occurs when trying to convert from an invalid timezone string. + #[error("invalid timezone string: {0}")] + InvalidTimezone(String), // New error variant for timezone strings } impl From> for ArrayRef { @@ -79,6 +87,12 @@ impl From> for ArrayRef { } OwnedColumn::Scalar(_) => unimplemented!("Cannot convert Scalar type to arrow type"), OwnedColumn::VarChar(col) => Arc::new(StringArray::from(col)), + OwnedColumn::TimestampTZ(time_unit, _, col) => match time_unit { + PoSQLTimeUnit::Second => Arc::new(TimestampSecondArray::from(col)), + PoSQLTimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(col)), + PoSQLTimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(col)), + PoSQLTimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(col)), + }, } } } @@ -174,6 +188,72 @@ impl TryFrom<&ArrayRef> for OwnedColumn { .map(|s| s.unwrap().to_string()) .collect(), )), + DataType::Timestamp(time_unit, timezone) => match time_unit { + ArrowTimeUnit::Second => { + let array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OwnedArrowConversionError::UnsupportedTimestampUnit( + "Second".to_string(), + ) + })?; + let timestamps = array.values().iter().copied().collect::>(); + Ok(OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::try_from(timezone.clone())?, + timestamps, + )) + } + ArrowTimeUnit::Millisecond => { + let array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OwnedArrowConversionError::UnsupportedTimestampUnit( + "Millisecond".to_string(), + ) + })?; + let timestamps = array.values().iter().copied().collect::>(); + Ok(OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Millisecond, + PoSQLTimeZone::try_from(timezone.clone())?, + timestamps, + )) + } + ArrowTimeUnit::Microsecond => { + let array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OwnedArrowConversionError::UnsupportedTimestampUnit( + "Microsecond".to_string(), + ) + })?; + let timestamps = array.values().iter().copied().collect::>(); + Ok(OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Microsecond, + PoSQLTimeZone::try_from(timezone.clone())?, + timestamps, + )) + } + ArrowTimeUnit::Nanosecond => { + let array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OwnedArrowConversionError::UnsupportedTimestampUnit( + "Nanosecond".to_string(), + ) + })?; + let timestamps = array.values().iter().copied().collect::>(); + Ok(OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Nanosecond, + PoSQLTimeZone::try_from(timezone.clone())?, + timestamps, + )) + } + }, &data_type => Err(OwnedArrowConversionError::UnsupportedType( data_type.clone(), )), diff --git a/crates/proof-of-sql/src/base/database/owned_column.rs b/crates/proof-of-sql/src/base/database/owned_column.rs index 03420916e..f6b9fb6a0 100644 --- a/crates/proof-of-sql/src/base/database/owned_column.rs +++ b/crates/proof-of-sql/src/base/database/owned_column.rs @@ -3,7 +3,11 @@ /// converting to the final result in either Arrow format or JSON. /// This is the analog of an arrow Array. use super::ColumnType; -use crate::base::{math::decimal::Precision, scalar::Scalar}; +use crate::base::{ + math::decimal::Precision, + scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, +}; #[derive(Debug, PartialEq, Clone, Eq)] #[non_exhaustive] /// Supported types for OwnedColumn @@ -24,6 +28,8 @@ pub enum OwnedColumn { Decimal75(Precision, i8, Vec), /// Scalar columns Scalar(Vec), + /// Timestamp columns + TimestampTZ(PoSQLTimeUnit, PoSQLTimeZone, Vec), } impl OwnedColumn { @@ -38,6 +44,7 @@ impl OwnedColumn { OwnedColumn::Int128(col) => col.len(), OwnedColumn::Decimal75(_, _, col) => col.len(), OwnedColumn::Scalar(col) => col.len(), + OwnedColumn::TimestampTZ(_, _, col) => col.len(), } } /// Returns true if the column is empty. @@ -51,6 +58,7 @@ impl OwnedColumn { OwnedColumn::Int128(col) => col.is_empty(), OwnedColumn::Scalar(col) => col.is_empty(), OwnedColumn::Decimal75(_, _, col) => col.is_empty(), + OwnedColumn::TimestampTZ(_, _, col) => col.is_empty(), } } /// Returns the type of the column. @@ -66,47 +74,7 @@ impl OwnedColumn { OwnedColumn::Decimal75(precision, scale, _) => { ColumnType::Decimal75(*precision, *scale) } + OwnedColumn::TimestampTZ(tu, tz, _) => ColumnType::TimestampTZ(*tu, *tz), } } } - -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::Boolean(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::SmallInt(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::Int(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::BigInt(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::Int128(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::VarChar(Vec::from_iter(iter)) - } -} -impl FromIterator for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::Scalar(Vec::from_iter(iter)) - } -} -impl<'a, S: Scalar> FromIterator<&'a str> for OwnedColumn { - fn from_iter>(iter: T) -> Self { - Self::from_iter(iter.into_iter().map(|s| s.to_string())) - } -} diff --git a/crates/proof-of-sql/src/base/database/owned_table_test.rs b/crates/proof-of-sql/src/base/database/owned_table_test.rs index 3a9e3c297..617fda933 100644 --- a/crates/proof-of-sql/src/base/database/owned_table_test.rs +++ b/crates/proof-of-sql/src/base/database/owned_table_test.rs @@ -2,6 +2,7 @@ use crate::{ base::{ database::{owned_table_utility::*, OwnedColumn, OwnedTable, OwnedTableError}, scalar::Curve25519Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }, proof_primitive::dory::DoryScalar, }; @@ -56,8 +57,22 @@ fn we_can_create_an_owned_table_with_data() { "boolean", [true, false, true, false, true, false, true, false, true], ), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [0, 1, 2, 3, 4, 5, 6, i64::MIN, i64::MAX], + ), ]); let mut table = IndexMap::new(); + table.insert( + Identifier::try_new("timestamp").unwrap(), + OwnedColumn::TimestampTZ( + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [0, 1, 2, 3, 4, 5, 6, i64::MIN, i64::MAX].into(), + ), + ); table.insert( Identifier::try_new("bigint").unwrap(), OwnedColumn::BigInt(vec![0_i64, 1, 2, 3, 4, 5, 6, i64::MIN, i64::MAX]), @@ -109,12 +124,24 @@ fn we_get_inequality_between_tables_with_differing_column_order() { int128("b", [0; 0]), varchar("c", ["0"; 0]), boolean("d", [false; 0]), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [0; 0], + ), ]); let owned_table_b: OwnedTable = owned_table([ boolean("d", [false; 0]), int128("b", [0; 0]), bigint("a", [0; 0]), varchar("c", ["0"; 0]), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [0; 0], + ), ]); assert_ne!(owned_table_a, owned_table_b); } @@ -125,12 +152,24 @@ fn we_get_inequality_between_tables_with_differing_data() { int128("b", [0]), varchar("c", ["0"]), boolean("d", [true]), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [1625072400], + ), ]); let owned_table_b: OwnedTable = owned_table([ bigint("a", [1]), int128("b", [0]), varchar("c", ["0"]), boolean("d", [true]), + timestamptz( + "timestamp", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [1625076000], + ), ]); assert_ne!(owned_table_a, owned_table_b); } diff --git a/crates/proof-of-sql/src/base/database/owned_table_test_accessor.rs b/crates/proof-of-sql/src/base/database/owned_table_test_accessor.rs index dd790aa25..8b88519d2 100644 --- a/crates/proof-of-sql/src/base/database/owned_table_test_accessor.rs +++ b/crates/proof-of-sql/src/base/database/owned_table_test_accessor.rs @@ -95,6 +95,7 @@ impl DataAccessor for OwnedTableTestA .alloc_slice_fill_iter(col.iter().map(|s| (*s).into())); Column::VarChar((col, scals)) } + OwnedColumn::TimestampTZ(tu, tz, col) => Column::TimestampTZ(*tu, *tz, col), } } } diff --git a/crates/proof-of-sql/src/base/database/owned_table_test_accessor_test.rs b/crates/proof-of-sql/src/base/database/owned_table_test_accessor_test.rs index e3c34d906..0c70a1ade 100644 --- a/crates/proof-of-sql/src/base/database/owned_table_test_accessor_test.rs +++ b/crates/proof-of-sql/src/base/database/owned_table_test_accessor_test.rs @@ -5,6 +5,7 @@ use super::{ use crate::base::{ database::owned_table_utility::*, scalar::{compute_commitment_for_testing, Curve25519Scalar}, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, }; use blitzar::proof::InnerProductProof; @@ -48,6 +49,12 @@ fn we_can_access_the_columns_of_a_table() { varchar("varchar", ["a", "bc", "d", "e"]), scalar("scalar", [1, 2, 3, 4]), boolean("boolean", [true, false, true, false]), + timestamptz( + "time", + PoSQLTimeUnit::Second, + PoSQLTimeZone::UTC, + [4, 5, 6, 5], + ), ]); accessor.add_table(table_ref_2, data2, 0_usize); @@ -99,6 +106,16 @@ fn we_can_access_the_columns_of_a_table() { Column::Boolean(col) => assert_eq!(col.to_vec(), vec![true, false, true, false]), _ => panic!("Invalid column type"), }; + + let column = ColumnRef::new( + table_ref_2, + "time".parse().unwrap(), + ColumnType::TimestampTZ(PoSQLTimeUnit::Second, PoSQLTimeZone::UTC), + ); + match accessor.get_column(column) { + Column::TimestampTZ(_, _, col) => assert_eq!(col.to_vec(), vec![4, 5, 6, 5]), + _ => panic!("Invalid column type"), + }; } #[test] diff --git a/crates/proof-of-sql/src/base/database/owned_table_utility.rs b/crates/proof-of-sql/src/base/database/owned_table_utility.rs index bf36acbbf..0b2131b30 100644 --- a/crates/proof-of-sql/src/base/database/owned_table_utility.rs +++ b/crates/proof-of-sql/src/base/database/owned_table_utility.rs @@ -14,7 +14,10 @@ //! ]); //! ``` use super::{OwnedColumn, OwnedTable}; -use crate::base::scalar::Scalar; +use crate::base::{ + scalar::Scalar, + time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}, +}; use core::ops::Deref; use proof_of_sql_parser::Identifier; @@ -195,3 +198,35 @@ pub fn decimal75( ), ) } + +/// Creates a (Identifier, OwnedColumn) pair for a timestamp column. +/// This is primarily intended for use in conjunction with [owned_table]. +/// +/// # Parameters +/// - `name`: The name of the column. +/// - `time_unit`: The time unit of the timestamps. +/// - `timezone`: The timezone for the timestamps. +/// - `data`: The data for the column, provided as an iterator over `i64` values representing time since the unix epoch. +/// +/// # Example +/// ``` +/// use proof_of_sql::base::{database::owned_table_utility::*, +/// scalar::Curve25519Scalar, +/// time::timestamp::{PoSQLTimeUnit, PoSQLTimeZone}}; +/// use chrono_tz::Europe::London; +/// +/// let result = owned_table::([ +/// timestamptz("event_time", PoSQLTimeUnit::Second, PoSQLTimeZone::new(London), vec![1625072400, 1625076000, 1625079600]), +/// ]); +/// ``` +pub fn timestamptz( + name: impl Deref, + time_unit: PoSQLTimeUnit, + timezone: PoSQLTimeZone, + data: impl IntoIterator, +) -> (Identifier, OwnedColumn) { + ( + name.parse().unwrap(), + OwnedColumn::TimestampTZ(time_unit, timezone, data.into_iter().collect()), + ) +} diff --git a/crates/proof-of-sql/src/base/database/record_batch_dataframe_conversion.rs b/crates/proof-of-sql/src/base/database/record_batch_dataframe_conversion.rs index 88ba6d76b..4c49a148d 100644 --- a/crates/proof-of-sql/src/base/database/record_batch_dataframe_conversion.rs +++ b/crates/proof-of-sql/src/base/database/record_batch_dataframe_conversion.rs @@ -1,8 +1,9 @@ use arrow::{ array::{ Array, BooleanArray, Decimal128Array, Int16Array, Int32Array, Int64Array, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, }, - datatypes::{DataType, Field, Schema}, + datatypes::{DataType, Field, Schema, TimeUnit as ArrowTimeUnit}, record_batch::RecordBatch, }; use polars::{ @@ -60,6 +61,7 @@ pub fn record_batch_to_dataframe(record_batch: RecordBatch) -> Option Series::new(f.name(), data) } + arrow::datatypes::DataType::Utf8 => { let data = col .as_any() @@ -81,6 +83,42 @@ pub fn record_batch_to_dataframe(record_batch: RecordBatch) -> Option // Note: we make this unchecked because if record batch has values that overflow 38 digits, so should the data frame. .into_series() } + arrow::datatypes::DataType::Timestamp(time_unit, _timezone_option) => { + match time_unit { + arrow::datatypes::TimeUnit::Second => { + let data = col + .as_any() + .downcast_ref::() + .map(|array| array.values()) + .unwrap(); + Series::new(f.name(), data) + } + arrow::datatypes::TimeUnit::Millisecond => { + let data = col + .as_any() + .downcast_ref::() + .map(|array| array.values()) + .unwrap(); + Series::new(f.name(), data) + } + arrow::datatypes::TimeUnit::Microsecond => { + let data = col + .as_any() + .downcast_ref::() + .map(|array| array.values()) + .unwrap(); + Series::new(f.name(), data) + } + arrow::datatypes::TimeUnit::Nanosecond => { + let data = col + .as_any() + .downcast_ref::() + .map(|array| array.values()) + .unwrap(); + Series::new(f.name(), data) + } + } + } _ => None?, }) }) @@ -170,6 +208,40 @@ pub fn dataframe_to_record_batch(data: DataFrame) -> Option { DataType::Decimal128(38, 0) } + // NOTE: Polars does not support seconds + polars::datatypes::DataType::Datetime(timeunit, timezone) => { + let col = series.i64().unwrap().cont_slice().unwrap(); + let timezone_arc = timezone.as_ref().map(|tz| Arc::from(tz.as_str())); + let arrow_array: Arc = match timeunit { + polars::datatypes::TimeUnit::Nanoseconds => { + Arc::new(TimestampNanosecondArray::with_timezone_opt( + col.to_vec().into(), + timezone_arc, + )) + } + polars::datatypes::TimeUnit::Microseconds => { + Arc::new(TimestampMicrosecondArray::with_timezone_opt( + col.to_vec().into(), + timezone_arc, + )) + } + polars::datatypes::TimeUnit::Milliseconds => { + Arc::new(TimestampMillisecondArray::with_timezone_opt( + col.to_vec().into(), + timezone_arc, + )) + } + }; + columns.push(arrow_array); + DataType::Timestamp( + match timeunit { + polars::datatypes::TimeUnit::Nanoseconds => ArrowTimeUnit::Nanosecond, + polars::datatypes::TimeUnit::Microseconds => ArrowTimeUnit::Microsecond, + polars::datatypes::TimeUnit::Milliseconds => ArrowTimeUnit::Millisecond, + }, + None, + ) + } _ => return None, }; diff --git a/crates/proof-of-sql/src/base/database/record_batch_utility.rs b/crates/proof-of-sql/src/base/database/record_batch_utility.rs index aa461a64f..7c67c8f7c 100644 --- a/crates/proof-of-sql/src/base/database/record_batch_utility.rs +++ b/crates/proof-of-sql/src/base/database/record_batch_utility.rs @@ -1,3 +1,8 @@ +use crate::base::time::timestamp::{PoSQLTimeUnit, Time}; +use arrow::array::{ + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; use std::sync::Arc; /// Extension trait for Vec to convert it to an Arrow array @@ -18,6 +23,48 @@ impl ToArrow for Vec { } } +impl ToArrow for Vec