Skip to content

Commit

Permalink
feat: Add support for results and time ranges (#733)
Browse files Browse the repository at this point in the history
This adds the plumbing to use History and Snapshot result
configurations. It adds the necessary decoration code to the session.
  • Loading branch information
bjchambers authored Sep 6, 2023
1 parent d2dd948 commit aaee541
Show file tree
Hide file tree
Showing 24 changed files with 283 additions and 96 deletions.
139 changes: 73 additions & 66 deletions crates/sparrow-compiler/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,72 +165,14 @@ impl FrontendOutput {
false
};

// Decorate the expression as needed for the query_type.
let result_node = match options.per_entity_behavior {
_ if !executable => {
// Don't decorate incomplete or otherwise non-executable expressions.
// We don't produce executable plans for incomplete expressions.
query.value()
}
PerEntityBehavior::All => {
dfg.enter_env();
dfg.bind("result", query);
let time_node = create_changed_since_time_node(&mut dfg)?;
dfg.bind("__changed_since_time__", time_node);

let decorated = add_decoration(
data_context,
&mut diagnostics,
&mut dfg,
CHANGED_SINCE_DECORATION,
)?;
dfg.exit_env();
decorated.value()
}
PerEntityBehavior::Final => {
dfg.enter_env();
dfg.bind("result", query);

// Treat FINAL queries as changed_since_time of 0
let time_node = create_changed_since_time_node(&mut dfg)?;
dfg.bind("__changed_since_time__", time_node);

let decorated = add_decoration(
data_context,
&mut diagnostics,
&mut dfg,
FINAL_QUERY_DECORATION,
)?;
dfg.exit_env();
decorated.value()
}
PerEntityBehavior::FinalAtTime => {
dfg.enter_env();
dfg.bind("result", query);

// Treat FINAL queries as changed_since_time of 0
let time_node = create_changed_since_time_node(&mut dfg)?;
dfg.bind("__changed_since_time__", time_node);

let time_node = create_final_at_time_time_node(&mut dfg)?;
dfg.bind("__final_at_time__", time_node);

// 1. If the final query time is provided then use it as the query final time in
// the special decorator 2. Use the same per entity behavior
// final for all of them
let decorated = add_decoration(
data_context,
&mut diagnostics,
&mut dfg,
FINAL_QUERY_AT_TIME_DECORATION,
)?;
dfg.exit_env();
decorated.value()
}
PerEntityBehavior::Unspecified => {
anyhow::bail!("Unspecified per entity behavior")
}
};
let result_node = decorate(
data_context,
&mut dfg,
&mut diagnostics,
executable,
query,
options.per_entity_behavior,
)?;

// Create the basic analysis information.
let num_errors = diagnostics.num_errors();
Expand Down Expand Up @@ -320,6 +262,71 @@ impl FrontendOutput {
}
}

pub fn decorate(
data_context: &mut DataContext,
dfg: &mut Dfg,
diagnostics: &mut DiagnosticCollector<'_>,
executable: bool,
query: AstDfgRef,
per_entity_behavior: PerEntityBehavior,
) -> anyhow::Result<egg::Id> {
match per_entity_behavior {
_ if !executable => {
// Don't decorate incomplete or otherwise non-executable expressions.
// We don't produce executable plans for incomplete expressions.
Ok(query.value())
}
PerEntityBehavior::All => {
dfg.enter_env();
dfg.bind("result", query);
let time_node = create_changed_since_time_node(dfg)?;
dfg.bind("__changed_since_time__", time_node);
let decorated =
add_decoration(data_context, diagnostics, dfg, CHANGED_SINCE_DECORATION)?;
dfg.exit_env();
Ok(decorated.value())
}
PerEntityBehavior::Final => {
dfg.enter_env();
dfg.bind("result", query);

// Treat FINAL queries as changed_since_time of 0
let time_node = create_changed_since_time_node(dfg)?;
dfg.bind("__changed_since_time__", time_node);

let decorated = add_decoration(data_context, diagnostics, dfg, FINAL_QUERY_DECORATION)?;
dfg.exit_env();
Ok(decorated.value())
}
PerEntityBehavior::FinalAtTime => {
dfg.enter_env();
dfg.bind("result", query);

// Treat FINAL queries as changed_since_time of 0
let time_node = create_changed_since_time_node(dfg)?;
dfg.bind("__changed_since_time__", time_node);

let time_node = create_final_at_time_time_node(dfg)?;
dfg.bind("__final_at_time__", time_node);

// 1. If the final query time is provided then use it as the query final time in
// the special decorator 2. Use the same per entity behavior
// final for all of them
let decorated = add_decoration(
data_context,
diagnostics,
dfg,
FINAL_QUERY_AT_TIME_DECORATION,
)?;
dfg.exit_env();
Ok(decorated.value())
}
PerEntityBehavior::Unspecified => {
anyhow::bail!("Unspecified per entity behavior")
}
}
}

/// Adds the given decoration to the dfg.
fn add_decoration(
data_context: &mut DataContext,
Expand Down
8 changes: 8 additions & 0 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,18 @@ impl ExecutionOptions {
self.changed_since_time = changed_since;
}

pub fn set_changed_since_s(&mut self, seconds: i64) {
self.changed_since_time = Timestamp { seconds, nanos: 0 };
}

pub fn set_final_at_time(&mut self, final_at_time: Timestamp) {
self.final_at_time = Some(final_at_time);
}

pub fn set_final_at_time_s(&mut self, seconds: i64) {
self.final_at_time = Some(Timestamp { seconds, nanos: 0 });
}

async fn compute_store(
&self,
object_stores: &ObjectStoreRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl ExpressionExecutor {
LateBoundValue::from_i32(late_bound).context("late bound value")?;
let literal = late_bindings[late_bound]
.as_ref()
.context("missing late bound value")?
.with_context(|| format!("missing late bound value {late_bound:?}"))?
.clone();

anyhow::ensure!(
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-session/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod table;
pub use error::Error;
pub use execution::Execution;
pub use expr::{Expr, Literal};
pub use session::{ExecutionOptions, Session};
pub use session::{ExecutionOptions, Results, Session};
pub use table::Table;
87 changes: 70 additions & 17 deletions crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub struct Session {
udfs: HashMap<Uuid, Arc<dyn Udf>>,
}

#[derive(Default)]
pub enum Results {
#[default]
History,
Snapshot,
}

#[derive(Default)]
pub struct ExecutionOptions {
/// The maximum number of rows to return.
Expand All @@ -42,6 +49,16 @@ pub struct ExecutionOptions {
pub max_batch_size: Option<usize>,
/// Whether to run execute as a materialization or not.
pub materialize: bool,
/// History or Snapshot results.
pub results: Results,
/// The changed since time. This is the minimum timestamp of changes to events.
/// For historic queries, this limits the output points.
/// For snapshot queries, this limits the set of entities that are considered changed.
pub changed_since_time_s: Option<i64>,
/// The final at time. This is the maximum timestamp output.
/// For historic queries, this limits the output points.
/// For snapshot queries, this determines the time at which the snapshot is produced.
pub final_at_time_s: Option<i64>,
}

/// Adds a table to the session.
Expand Down Expand Up @@ -346,31 +363,49 @@ impl Session {
}
}

/// Execute the query.
///
/// It is unnfortunate this requires `&mut self` instead of `&self`. It relates to the
/// fact that the decorations may require mutating the DFG, which in turn requires
/// mutability. In practice, the decorations shouldn't mutate the DFG and/or that
/// shouldn't require mutating the session.
pub fn execute(
&self,
expr: &Expr,
&mut self,
query: &Expr,
options: ExecutionOptions,
) -> error_stack::Result<Execution, Error> {
// TODO: Decorations?
let group_id = expr
let group_id = query
.0
.grouping()
.expect("query to be grouped (non-literal)");
let primary_group_info = self
.data_context
.group_info(group_id)
.expect("missing group info");
let primary_grouping = primary_group_info.name().to_owned();
let primary_grouping_key_type = primary_group_info.key_type();

// Hacky. Ideally, we'd determine the schema from the created execution plan.
// Currently, this isn't easily available. Instead, we create this from the
// columns we know we're producing.
let schema = result_schema(expr, primary_grouping_key_type)?;
let per_entity_behavior = match options.results {
Results::History => PerEntityBehavior::All,
Results::Snapshot if options.final_at_time_s.is_some() => {
PerEntityBehavior::FinalAtTime
}
Results::Snapshot => PerEntityBehavior::Final,
};

// Apply decorations as necessary for the per-entity behavior.
let feature_set = FeatureSet::default();
let mut diagnostics = DiagnosticCollector::new(&feature_set);
let expr = sparrow_compiler::decorate(
&mut self.data_context,
&mut self.dfg,
&mut diagnostics,
true,
query.0.clone(),
per_entity_behavior,
)
.into_report()
.change_context(Error::Compile)?;
error_stack::ensure!(diagnostics.num_errors() == 0, Error::Internal);

// First, extract the necessary subset of the DFG as an expression.
// Extract the necessary subset of the DFG as an expression.
// This will allow us to operate without mutating things.
let expr = self.dfg.extract_simplest(expr.0.value());
let expr = self.dfg.extract_simplest(expr);
let expr = expr
.simplify(&CompilerOptions {
..CompilerOptions::default()
Expand All @@ -381,13 +416,24 @@ impl Session {
.into_report()
.change_context(Error::Compile)?;

let primary_group_info = self
.data_context
.group_info(group_id)
.expect("missing group info");
let primary_grouping = primary_group_info.name().to_owned();
let primary_grouping_key_type = primary_group_info.key_type();

// Hacky. Ideally, we'd determine the schema from the created execution plan.
// Currently, this isn't easily available. Instead, we create this from the
// columns we know we're producing.
let schema = result_schema(query, primary_grouping_key_type)?;

// TODO: Incremental?
// TODO: Slicing?
let plan = sparrow_compiler::plan::extract_plan_proto(
&self.data_context,
expr,
// TODO: Configure per-entity behavior.
PerEntityBehavior::Final,
per_entity_behavior,
primary_grouping,
primary_grouping_key_type,
)
Expand Down Expand Up @@ -476,6 +522,13 @@ impl ExecutionOptions {
});
}

if let Some(changed_since) = self.changed_since_time_s {
options.set_changed_since_s(changed_since);
}
if let Some(final_at_time) = self.final_at_time_s {
options.set_final_at_time_s(final_at_time);
}

options
}
}
Expand Down
2 changes: 1 addition & 1 deletion python/pysrc/kaskada/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Kaskada query builder and local execution engine."""
from __future__ import annotations

from . import destinations, plot, sources, windows
from . import destinations, plot, results, sources, windows
from ._execution import Execution, ResultIterator
from ._session import init_session
from ._timestream import Arg, LiteralValue, Timestream, record
Expand Down
32 changes: 23 additions & 9 deletions python/pysrc/kaskada/_execution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import AsyncIterator, Callable, Iterator, Optional, TypeVar
from typing import AsyncIterator, Callable, Iterator, Literal, Optional, TypeVar

import pyarrow as pa

Expand All @@ -11,19 +11,33 @@

@dataclass
class _ExecutionOptions:
"""Execution options passed to the FFI layer.
Attributes:
row_limit: The maximum number of rows to return. If not specified, all rows are returned.
max_batch_size: The maximum batch size to use when returning results.
If not specified, the default batch size will be used.
materialize: If true, the query will be a continuous materialization.
"""
"""Execution options passed to the FFI layer."""

#: The maximum number of rows to return.
#: If not specified, all rows are returned.
row_limit: Optional[int] = None

#: The maximum batch size to use when returning results.
#: If not specified, the default batch size will be used.
max_batch_size: Optional[int] = None

#: If true, the query will be a continuous materialization.
materialize: bool = False

#: The type of results to return.
results: Literal["history", "snapshot"] = "history"

#: The earliest time of changes to include in the results.
#: For history, this limits the points output.
#: For snapshots, this limits the entities that are output.
changed_since: Optional[int] = None

#: The last time to process.
#: If not set, defaults to the current time.
#: For history, this limits the points output.
#: For snapshots, this determines when the snapshot is produced.
final_at: Optional[int] = None


class Execution:
"""Represents an execution of a TimeStream."""
Expand Down
Loading

0 comments on commit aaee541

Please sign in to comment.