Skip to content

Commit

Permalink
feat: implement SliceExpr in postprocessing (#44)
Browse files Browse the repository at this point in the history
# Rationale for this change
We want to implement `PostprocessingStep` for `SliceExpr` in order to
phase out polars-based operations.
<!--
Why are you proposing this change? If this is already explained clearly
in the linked Jira ticket then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

# What changes are included in this PR?
- define `OwnedTablePostprocessing` and `PostprocessingStep`
- define `SliceExpr` and implement `PostprocessingStep` for `SliceExpr`
<!--
There is no need to duplicate the description in the ticket here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

# Are these changes tested?
Yes
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
  • Loading branch information
iajoiner authored Jul 9, 2024
1 parent fb1b98a commit 0d1437f
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 0 deletions.
19 changes: 19 additions & 0 deletions crates/proof-of-sql/src/base/database/owned_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ impl<S: Scalar> OwnedColumn<S> {
OwnedColumn::TimestampTZ(_, _, col) => col.len(),
}
}

/// Returns the sliced column.
pub fn slice(&self, start: usize, end: usize) -> Self {
match self {
OwnedColumn::Boolean(col) => OwnedColumn::Boolean(col[start..end].to_vec()),
OwnedColumn::SmallInt(col) => OwnedColumn::SmallInt(col[start..end].to_vec()),
OwnedColumn::Int(col) => OwnedColumn::Int(col[start..end].to_vec()),
OwnedColumn::BigInt(col) => OwnedColumn::BigInt(col[start..end].to_vec()),
OwnedColumn::VarChar(col) => OwnedColumn::VarChar(col[start..end].to_vec()),
OwnedColumn::Int128(col) => OwnedColumn::Int128(col[start..end].to_vec()),
OwnedColumn::Decimal75(precision, scale, col) => {
OwnedColumn::Decimal75(*precision, *scale, col[start..end].to_vec())
}
OwnedColumn::Scalar(col) => OwnedColumn::Scalar(col[start..end].to_vec()),
OwnedColumn::TimestampTZ(tu, tz, col) => {
OwnedColumn::TimestampTZ(*tu, *tz, col[start..end].to_vec())
}
}
}
/// Returns true if the column is empty.
pub fn is_empty(&self) -> bool {
match self {
Expand Down
1 change: 1 addition & 0 deletions crates/proof-of-sql/src/sql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This module contains the main logic for Proof of SQL.
pub mod ast;
pub mod parse;
pub mod postprocessing;
pub mod proof;
pub mod transform;
12 changes: 12 additions & 0 deletions crates/proof-of-sql/src/sql/postprocessing/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use thiserror::Error;

/// Errors in postprocessing
#[derive(Error, Debug, PartialEq, Eq)]
pub enum PostprocessingError {
/// Error in slicing due to slice index beyond usize
#[error("Error in slicing due to slice index beyond usize {0}")]
InvalidSliceIndex(i128),
}

/// Result type for postprocessing
pub type PostprocessingResult<T> = core::result::Result<T, PostprocessingError>;
18 changes: 18 additions & 0 deletions crates/proof-of-sql/src/sql/postprocessing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! This module contains new lightweight postprocessing for non-provable components.
mod error;
#[allow(unused_imports)]
pub use error::{PostprocessingError, PostprocessingResult};
mod owned_table_postprocessing;

mod postprocessing_step;
#[allow(unused_imports)]
pub use owned_table_postprocessing::{apply_postprocessing_steps, OwnedTablePostprocessing};
pub use postprocessing_step::PostprocessingStep;
#[cfg(test)]
pub mod test_utility;

mod slice_expr;
pub use slice_expr::SliceExpr;

#[cfg(test)]
mod slice_expr_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use super::{PostprocessingResult, PostprocessingStep, SliceExpr};
use crate::base::{database::OwnedTable, scalar::Scalar};

/// An enum for nodes that can apply postprocessing to a `OwnedTable`.
#[derive(Debug, Clone)]
pub enum OwnedTablePostprocessing<S: Scalar> {
/// Slice the `OwnedTable` with the given `SliceExpr`.
Slice(SliceExpr<S>),
}

impl<S: Scalar> PostprocessingStep<S> for OwnedTablePostprocessing<S> {
/// Apply the postprocessing step to the `OwnedTable` and return the result.
fn apply(&self, owned_table: OwnedTable<S>) -> PostprocessingResult<OwnedTable<S>> {
match self {
OwnedTablePostprocessing::Slice(slice_expr) => slice_expr.apply(owned_table),
}
}
}

impl<S: Scalar> OwnedTablePostprocessing<S> {
/// Create a new `OwnedTablePostprocessing` with the given `SliceExpr`.
pub fn new_slice(slice_expr: SliceExpr<S>) -> Self {
Self::Slice(slice_expr)
}
}

/// Apply a list of postprocessing steps to an `OwnedTable`.
pub fn apply_postprocessing_steps<S: Scalar>(
owned_table: OwnedTable<S>,
postprocessing_steps: &[OwnedTablePostprocessing<S>],
) -> PostprocessingResult<OwnedTable<S>> {
// Sadly try_fold() only works on Options
let mut current_table = owned_table;
for step in postprocessing_steps {
current_table = step.apply(current_table)?;
}
Ok(current_table)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use super::PostprocessingResult;
use crate::base::{database::OwnedTable, scalar::Scalar};
use core::fmt::Debug;

/// A trait for postprocessing steps that can be applied to an `OwnedTable`.
pub trait PostprocessingStep<S: Scalar>: Debug + Send + Sync {
/// Apply the postprocessing step to the `OwnedTable` and return the result.
fn apply(&self, owned_table: OwnedTable<S>) -> PostprocessingResult<OwnedTable<S>>;
}
63 changes: 63 additions & 0 deletions crates/proof-of-sql/src/sql/postprocessing/slice_expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use super::{PostprocessingError, PostprocessingResult, PostprocessingStep};
use crate::base::{database::OwnedTable, scalar::Scalar};
use serde::{Deserialize, Serialize};

/// A `SliceExpr` represents a slice of a `LazyFrame`.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SliceExpr<S: Scalar> {
/// number of rows to return
///
/// - if None, specify all rows
number_rows: Option<u64>,

/// number of rows to skip
///
/// - if None, specify the first row as starting point
/// - if Some(nonnegative), specify the offset from the beginning
/// - if Some(negative), specify the offset from the end
/// (e.g. -1 is the last row, -2 is the second to last row, etc.)
offset_value: Option<i64>,

/// Phantom
_phantom: core::marker::PhantomData<S>,
}

impl<S: Scalar> SliceExpr<S> {
/// Create a new `SliceExpr` with the given `number_rows` and `offset`.
pub fn new(number_rows: Option<u64>, offset_value: Option<i64>) -> Self {
Self {
number_rows,
offset_value,
_phantom: core::marker::PhantomData,
}
}
}

impl<S: Scalar> PostprocessingStep<S> for SliceExpr<S> {
/// Apply the slice transformation to the given `OwnedTable`.
fn apply(&self, owned_table: OwnedTable<S>) -> PostprocessingResult<OwnedTable<S>> {
let num_rows = owned_table.num_rows();
let limit = self.number_rows.unwrap_or(num_rows as u64);
let offset = self.offset_value.unwrap_or(0);
// Be permissive with data types at first so that computation can be done.
// If the conversion fails, we will return None.
let possible_starting_row = if offset < 0 {
num_rows as i128 + offset as i128
} else {
offset as i128
};
// The `possible_ending_row` is NOT inclusive.
let possible_ending_row = (possible_starting_row + limit as i128).min(num_rows as i128);
let starting_row = usize::try_from(possible_starting_row)
.map_err(|_| PostprocessingError::InvalidSliceIndex(possible_starting_row))?;
let ending_row = usize::try_from(possible_ending_row)
.map_err(|_| PostprocessingError::InvalidSliceIndex(possible_ending_row))?;
Ok(OwnedTable::<S>::try_from_iter(
owned_table
.into_inner()
.into_iter()
.map(|(identifier, column)| (identifier, column.slice(starting_row, ending_row))),
)
.expect("Sliced columns of an existing table should have equal length"))
}
}
101 changes: 101 additions & 0 deletions crates/proof-of-sql/src/sql/postprocessing/slice_expr_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use crate::{
base::{
database::{owned_table_utility::*, OwnedTable},
scalar::Curve25519Scalar,
},
sql::postprocessing::{apply_postprocessing_steps, test_utility::*},
};

#[test]
fn we_can_slice_an_owned_table_using_only_a_positive_limit_value() {
let limit = 3_usize;
let data_a = [123_i64, 342, -234, 777, 123, 34];
let data_d = ["alfa", "beta", "abc", "f", "kl", "f"];
let table: OwnedTable<Curve25519Scalar> =
owned_table([bigint("a", data_a.to_vec()), varchar("d", data_d.to_vec())]);
let expected_table = owned_table([
bigint("a", data_a[0..limit].to_vec()),
varchar("d", data_d[0..limit].to_vec()),
]);
let postprocessing = [slice(Some(limit as u64), None)];
let actual_table = apply_postprocessing_steps(table, &postprocessing).unwrap();
assert_eq!(actual_table, expected_table);
}

#[test]
fn we_can_slice_an_owned_table_using_only_a_zero_limit_value() {
let limit = 0;
let data_a = [123_i64, 342, -234, 777, 123, 34];
let data_d = ["alfa", "beta", "abc", "f", "kl", "f"];
let table: OwnedTable<Curve25519Scalar> =
owned_table([bigint("a", data_a.to_vec()), varchar("d", data_d.to_vec())]);
let expected_table = owned_table([
bigint("a", Vec::<i64>::new()),
varchar("d", Vec::<String>::new()),
]);
let postprocessing = [slice(Some(limit as u64), None)];
let actual_table = apply_postprocessing_steps(table, &postprocessing).unwrap();
assert_eq!(actual_table, expected_table);
}

#[test]
fn we_can_slice_an_owned_table_using_only_a_positive_offset_value() {
let offset = 3;
let data_a = [123_i64, 342, -234, 777, 123, 34];
let data_d = ["alfa", "beta", "abc", "f", "kl", "f"];
let table: OwnedTable<Curve25519Scalar> =
owned_table([bigint("a", data_a.to_vec()), varchar("d", data_d.to_vec())]);
let expected_table = owned_table([
bigint("a", data_a[(offset as usize)..].to_vec()),
varchar("d", data_d[(offset as usize)..].to_vec()),
]);
let postprocessing = [slice(None, Some(offset))];
let actual_table = apply_postprocessing_steps(table, &postprocessing).unwrap();
assert_eq!(actual_table, expected_table);
}

#[test]
fn we_can_slice_an_owned_table_using_only_a_negative_offset_value() {
let offset = -2;
let data_a = [123_i64, 342, -234, 777, 123, 34];
let data_d = ["alfa", "beta", "abc", "f", "kl", "f"];
let table: OwnedTable<Curve25519Scalar> =
owned_table([bigint("a", data_a.to_vec()), varchar("d", data_d.to_vec())]);
let expected_table = owned_table([
bigint(
"a",
data_a[(data_a.len() as i64 + offset) as usize..].to_vec(),
),
varchar(
"d",
data_d[(data_a.len() as i64 + offset) as usize..].to_vec(),
),
]);
let postprocessing = [slice(None, Some(offset))];
let actual_table = apply_postprocessing_steps(table, &postprocessing).unwrap();
assert_eq!(actual_table, expected_table);
}

#[test]
fn we_can_slice_an_owned_table_using_both_limit_and_offset_values() {
let offset = -2;
let limit = 1_usize;
let data_a = [123_i64, 342, -234, 777, 123, 34];
let data_d = ["alfa", "beta", "abc", "f", "kl", "f"];
let table: OwnedTable<Curve25519Scalar> =
owned_table([bigint("a", data_a.to_vec()), varchar("d", data_d.to_vec())]);
let beg_expected_index = (data_a.len() as i64 + offset) as usize;
let expected_table = owned_table([
bigint(
"a",
data_a[beg_expected_index..(beg_expected_index + limit)].to_vec(),
),
varchar(
"d",
data_d[beg_expected_index..(beg_expected_index + limit)].to_vec(),
),
]);
let postprocessing = [slice(Some(limit as u64), Some(offset))];
let actual_table = apply_postprocessing_steps(table, &postprocessing).unwrap();
assert_eq!(actual_table, expected_table);
}
6 changes: 6 additions & 0 deletions crates/proof-of-sql/src/sql/postprocessing/test_utility.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use super::*;
use crate::base::scalar::Scalar;

pub fn slice<S: Scalar>(limit: Option<u64>, offset: Option<i64>) -> OwnedTablePostprocessing<S> {
OwnedTablePostprocessing::<S>::new_slice(SliceExpr::new(limit, offset))
}

0 comments on commit 0d1437f

Please sign in to comment.