Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: replace first/last with generic aggregators #755

Merged
merged 3 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions crates/sparrow-compiler/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod implementation;
mod json;
mod logical;
mod math;
mod pushdown;
mod registry;
mod string;
mod time;
Expand All @@ -18,7 +17,6 @@ mod window;

pub use function::*;
use implementation::*;
pub(crate) use pushdown::*;
pub use registry::*;
pub use time_domain_check::*;

Expand Down
80 changes: 11 additions & 69 deletions crates/sparrow-compiler/src/functions/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use anyhow::Context;

use crate::functions::time_domain_check::TimeDomainCheck;
use crate::functions::{Implementation, Pushdown, Registry};
use crate::functions::{Implementation, Registry};

/// The `is_new` pattern used for basic aggregations.
const AGGREGATION_IS_NEW: &str = "(logical_or ?window_is_new ?input_is_new)";
Expand Down Expand Up @@ -124,39 +122,11 @@ pub(super) fn register(registry: &mut Registry) {
.with_dfg_signature(
"last<T: any>(input: T, window: bool = null, duration: i64 = null) -> T",
)
.with_implementation(Implementation::Pushdown(Box::new(
Pushdown::try_new(
0,
&format!(
"(last ({}) ({}) ({}))",
"transform (if ?is_new ?input_value) (merge_join ?op ?window_op)",
"?window_value",
"?duration_value"
),
// The per-field pattern produces the last value of the field.
// The outer if and last is handling the case where the latest *record*
// contained a null value for the field, by only using the last value of
// the field if the record is new and valid and the input field is valid in
// that record.
&format!(
"(if (last ({}) ({}) ({})) ?recurse_on_input_field)",
"transform (if (logical_and ?is_new (is_valid ?input_record)) (is_valid \
?input_field)) (merge_join ?op ?window_op)",
"?window_value",
"?duration_value"
),
// The result pattern treats the resulting record as `null` if there haven't
// been any new non-null records observed. Eg., requires the count to be > 0.
&format!(
"(if (gt (count_if ({}) ({}) ({})) 0u32) ?result_record)",
"transform (logical_and ?is_new (is_valid ?input_record)) (merge_join ?op \
?window_op)",
"?window_value",
"?duration_value"
),
)
.context("last")
.unwrap(),
.with_implementation(Implementation::new_pattern(&format!(
"(last ({}) ({}) ({}))",
"transform (if ?input_is_new ?input_value) (merge_join ?input_op ?window_op)",
"?window_value",
"?duration_value"
)))
.with_is_new(Implementation::new_pattern(AGGREGATION_IS_NEW))
.with_time_domain_check(TimeDomainCheck::Aggregation);
Expand All @@ -166,39 +136,11 @@ pub(super) fn register(registry: &mut Registry) {
.with_dfg_signature(
"first<T: any>(input: T, window: bool = null, duration: i64 = null) -> T",
)
.with_implementation(Implementation::Pushdown(Box::new(
Pushdown::try_new(
0,
&format!(
"(first({}) ({}) ({}))",
"transform (if ?is_new ?input_value) (merge_join ?op ?window_op)",
"?window_value",
"?duration_value"
),
// The per-field pattern produces the last value of the field.
// The outer if and last is handling the case where the latest *record*
// contained a null value for the field, by only using the last value of
// the field if the record is new and valid and the input field is valid in
// that record.
&format!(
"(if (first ({}) ({}) ({})) ?recurse_on_input_field)",
"transform (if (logical_and ?is_new (is_valid ?input_record)) (is_valid \
?input_field)) (merge_join ?op ?window_op)",
"?window_value",
"?duration_value"
),
// The result pattern treats the resulting record as `null` if there haven't
// been any new non-null records observed. Eg., requires the count to be > 0.
&format!(
"(if (gt (count_if ({}) ({}) ({})) 0u32) ?result_record)",
"transform (logical_and ?is_new (is_valid ?input_record)) (merge_join ?op \
?window_op)",
"?window_value",
"?duration_value"
),
)
.context("first")
.unwrap(),
.with_implementation(Implementation::new_pattern(&format!(
"(first ({}) ({}) ({}))",
"transform (if ?input_is_new ?input_value) (merge_join ?input_op ?window_op)",
"?window_value",
"?duration_value"
)))
.with_is_new(Implementation::new_pattern(AGGREGATION_IS_NEW))
.with_time_domain_check(TimeDomainCheck::Aggregation);
Expand Down
40 changes: 3 additions & 37 deletions crates/sparrow-compiler/src/functions/implementation.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::str::FromStr;

use anyhow::{anyhow, Context};
use egg::{Id, Var};
use egg::Id;
use itertools::izip;
use once_cell::sync::OnceCell;
use smallvec::smallvec;
use sparrow_api::kaskada::v1alpha::operation_plan::tick_operation::TickBehavior;
use sparrow_instructions::InstOp;
use sparrow_syntax::{Expr, FeatureSetPart, FenlType, Located, ResolvedExpr, WindowBehavior};
use sparrow_syntax::{Expr, FeatureSetPart, Located, ResolvedExpr, WindowBehavior};

use crate::ast_to_dfg::ast_to_dfg;
use crate::dfg::{Dfg, DfgPattern, Operation, StepKind};
use crate::frontend::resolve_arguments::resolve_recursive;
use crate::functions::{Function, Pushdown};
use crate::functions::Function;
use crate::{is_any_new, AstDfgRef, DataContext, DiagnosticCollector};

/// Enum describing how a function is implemented.
Expand All @@ -26,9 +26,6 @@ pub(super) enum Implementation {
Window(WindowBehavior),
/// The function should be expanded using the given pattern.
Pattern(DfgPattern),
/// The function should be expanded on primitive fields using the given
/// pushdown.
Pushdown(Box<Pushdown>),
/// The function should be rewritten as the given fenl expression.
///
/// This differs from `Rewrite` in that this expression uses fenl syntax and
Expand Down Expand Up @@ -140,37 +137,6 @@ impl Implementation {

Ok(result.value())
}
Implementation::Pushdown(pushdown) => {
// To avoid accidents, we don't include the "driving" argument in the
// substitution. Specifically, the "input" to the pushdown will
// be changed at each recursion.
let mut subst =
function.create_subst_from_args(dfg, args, Some(pushdown.pushdown_on()));

let pushdown_on = &args[pushdown.pushdown_on()];
// Add an `is_new` that indicates whether the argument being pushed down on
// was new. We can't access the `is_new` of individual components.
subst.insert(
Var::from_str("?is_new").context("Failed to parse ?is_new")?,
pushdown_on.is_new(),
);
subst.insert(
Var::from_str("?op").context("Failed to parse ?op")?,
dfg.operation(pushdown_on.value()),
);

match pushdown_on.value_type() {
FenlType::Concrete(data_type) => {
pushdown.pushdown(dfg, &subst, pushdown_on.value(), data_type)
}
FenlType::Error => Ok(dfg.error_node().value()),
non_concrete => Err(anyhow!(
"Unable to pushdown '{}' on non-concrete type {}",
function.name(),
non_concrete
)),
}
}
Implementation::AnyInputIsNew => Ok(is_any_new(dfg, args)?),
}
}
Expand Down
183 changes: 0 additions & 183 deletions crates/sparrow-compiler/src/functions/pushdown.rs

This file was deleted.

Loading
Loading