Skip to content

Commit

Permalink
ref: replace first/last with generic aggregators (#755)
Browse files Browse the repository at this point in the history
This allows us to eliminate the "pushdown" code for applying first/last
to records, which were exceedingly complicated.
  • Loading branch information
bjchambers authored Sep 15, 2023
1 parent d2118ee commit 4279fe8
Show file tree
Hide file tree
Showing 40 changed files with 892 additions and 4,793 deletions.
2 changes: 0 additions & 2 deletions crates/sparrow-compiler/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod implementation;
mod json;
mod logical;
mod math;
mod pushdown;
mod registry;
mod string;
mod time;
Expand All @@ -18,7 +17,6 @@ mod window;

pub use function::*;
use implementation::*;
pub(crate) use pushdown::*;
pub use registry::*;
pub use time_domain_check::*;

Expand Down
80 changes: 11 additions & 69 deletions crates/sparrow-compiler/src/functions/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use anyhow::Context;

use crate::functions::time_domain_check::TimeDomainCheck;
use crate::functions::{Implementation, Pushdown, Registry};
use crate::functions::{Implementation, Registry};

/// The `is_new` pattern used for basic aggregations.
const AGGREGATION_IS_NEW: &str = "(logical_or ?window_is_new ?input_is_new)";
Expand Down Expand Up @@ -124,39 +122,11 @@ pub(super) fn register(registry: &mut Registry) {
.with_dfg_signature(
"last<T: any>(input: T, window: bool = null, duration: i64 = null) -> T",
)
.with_implementation(Implementation::Pushdown(Box::new(
Pushdown::try_new(
0,
&format!(
"(last ({}) ({}) ({}))",
"transform (if ?is_new ?input_value) (merge_join ?op ?window_op)",
"?window_value",
"?duration_value"
),
// The per-field pattern produces the last value of the field.
// The outer if and last is handling the case where the latest *record*
// contained a null value for the field, by only using the last value of
// the field if the record is new and valid and the input field is valid in
// that record.
&format!(
"(if (last ({}) ({}) ({})) ?recurse_on_input_field)",
"transform (if (logical_and ?is_new (is_valid ?input_record)) (is_valid \
?input_field)) (merge_join ?op ?window_op)",
"?window_value",
"?duration_value"
),
// The result pattern treats the resulting record as `null` if there haven't
// been any new non-null records observed. Eg., requires the count to be > 0.
&format!(
"(if (gt (count_if ({}) ({}) ({})) 0u32) ?result_record)",
"transform (logical_and ?is_new (is_valid ?input_record)) (merge_join ?op \
?window_op)",
"?window_value",
"?duration_value"
),
)
.context("last")
.unwrap(),
.with_implementation(Implementation::new_pattern(&format!(
"(last ({}) ({}) ({}))",
"transform (if ?input_is_new ?input_value) (merge_join ?input_op ?window_op)",
"?window_value",
"?duration_value"
)))
.with_is_new(Implementation::new_pattern(AGGREGATION_IS_NEW))
.with_time_domain_check(TimeDomainCheck::Aggregation);
Expand All @@ -166,39 +136,11 @@ pub(super) fn register(registry: &mut Registry) {
.with_dfg_signature(
"first<T: any>(input: T, window: bool = null, duration: i64 = null) -> T",
)
.with_implementation(Implementation::Pushdown(Box::new(
Pushdown::try_new(
0,
&format!(
"(first({}) ({}) ({}))",
"transform (if ?is_new ?input_value) (merge_join ?op ?window_op)",
"?window_value",
"?duration_value"
),
// The per-field pattern produces the last value of the field.
// The outer if and last is handling the case where the latest *record*
// contained a null value for the field, by only using the last value of
// the field if the record is new and valid and the input field is valid in
// that record.
&format!(
"(if (first ({}) ({}) ({})) ?recurse_on_input_field)",
"transform (if (logical_and ?is_new (is_valid ?input_record)) (is_valid \
?input_field)) (merge_join ?op ?window_op)",
"?window_value",
"?duration_value"
),
// The result pattern treats the resulting record as `null` if there haven't
// been any new non-null records observed. Eg., requires the count to be > 0.
&format!(
"(if (gt (count_if ({}) ({}) ({})) 0u32) ?result_record)",
"transform (logical_and ?is_new (is_valid ?input_record)) (merge_join ?op \
?window_op)",
"?window_value",
"?duration_value"
),
)
.context("first")
.unwrap(),
.with_implementation(Implementation::new_pattern(&format!(
"(first ({}) ({}) ({}))",
"transform (if ?input_is_new ?input_value) (merge_join ?input_op ?window_op)",
"?window_value",
"?duration_value"
)))
.with_is_new(Implementation::new_pattern(AGGREGATION_IS_NEW))
.with_time_domain_check(TimeDomainCheck::Aggregation);
Expand Down
40 changes: 3 additions & 37 deletions crates/sparrow-compiler/src/functions/implementation.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::str::FromStr;

use anyhow::{anyhow, Context};
use egg::{Id, Var};
use egg::Id;
use itertools::izip;
use once_cell::sync::OnceCell;
use smallvec::smallvec;
use sparrow_api::kaskada::v1alpha::operation_plan::tick_operation::TickBehavior;
use sparrow_instructions::InstOp;
use sparrow_syntax::{Expr, FeatureSetPart, FenlType, Located, ResolvedExpr, WindowBehavior};
use sparrow_syntax::{Expr, FeatureSetPart, Located, ResolvedExpr, WindowBehavior};

use crate::ast_to_dfg::ast_to_dfg;
use crate::dfg::{Dfg, DfgPattern, Operation, StepKind};
use crate::frontend::resolve_arguments::resolve_recursive;
use crate::functions::{Function, Pushdown};
use crate::functions::Function;
use crate::{is_any_new, AstDfgRef, DataContext, DiagnosticCollector};

/// Enum describing how a function is implemented.
Expand All @@ -26,9 +26,6 @@ pub(super) enum Implementation {
Window(WindowBehavior),
/// The function should be expanded using the given pattern.
Pattern(DfgPattern),
/// The function should be expanded on primitive fields using the given
/// pushdown.
Pushdown(Box<Pushdown>),
/// The function should be rewritten as the given fenl expression.
///
/// This differs from `Rewrite` in that this expression uses fenl syntax and
Expand Down Expand Up @@ -140,37 +137,6 @@ impl Implementation {

Ok(result.value())
}
Implementation::Pushdown(pushdown) => {
// To avoid accidents, we don't include the "driving" argument in the
// substitution. Specifically, the "input" to the pushdown will
// be changed at each recursion.
let mut subst =
function.create_subst_from_args(dfg, args, Some(pushdown.pushdown_on()));

let pushdown_on = &args[pushdown.pushdown_on()];
// Add an `is_new` that indicates whether the argument being pushed down on
// was new. We can't access the `is_new` of individual components.
subst.insert(
Var::from_str("?is_new").context("Failed to parse ?is_new")?,
pushdown_on.is_new(),
);
subst.insert(
Var::from_str("?op").context("Failed to parse ?op")?,
dfg.operation(pushdown_on.value()),
);

match pushdown_on.value_type() {
FenlType::Concrete(data_type) => {
pushdown.pushdown(dfg, &subst, pushdown_on.value(), data_type)
}
FenlType::Error => Ok(dfg.error_node().value()),
non_concrete => Err(anyhow!(
"Unable to pushdown '{}' on non-concrete type {}",
function.name(),
non_concrete
)),
}
}
Implementation::AnyInputIsNew => Ok(is_any_new(dfg, args)?),
}
}
Expand Down
183 changes: 0 additions & 183 deletions crates/sparrow-compiler/src/functions/pushdown.rs

This file was deleted.

Loading

0 comments on commit 4279fe8

Please sign in to comment.