From ce58e26795b549eba640c4306b1d8d5f3880566a Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Fri, 8 Nov 2024 12:14:22 -0800 Subject: [PATCH] refactor: encode type info implicitly in the physical plan (#1938) --- crates/physical-plan/src/compile.rs | 118 ++++++++++++--------- crates/physical-plan/src/plan.rs | 159 +++++++++++----------------- 2 files changed, 126 insertions(+), 151 deletions(-) diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index b5b39028c75..4420c5d3595 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -1,40 +1,39 @@ //! Lowering from the logical plan to the physical plan. -use crate::plan; -use crate::plan::{CrossJoin, Filter, PhysicalCtx, PhysicalExpr, PhysicalPlan}; +use crate::plan::{PhysicalCtx, PhysicalExpr, PhysicalPlan}; use spacetimedb_expr::expr::{Expr, Let, LetCtx, Project, RelExpr, Select}; use spacetimedb_expr::statement::Statement; -use spacetimedb_expr::ty::{TyCtx, TyId}; +use spacetimedb_expr::ty::{TyCtx, Type}; use spacetimedb_expr::StatementCtx; use spacetimedb_sql_parser::ast::BinOp; -fn compile_expr(_ctx: &TyCtx, vars: &LetCtx, expr: Expr) -> PhysicalExpr { +fn compile_expr(ctx: &TyCtx, vars: &LetCtx, expr: Expr) -> PhysicalExpr { match expr { Expr::Bin(op, lhs, rhs) => { - let lhs = compile_expr(_ctx, vars, *lhs); - let rhs = compile_expr(_ctx, vars, *rhs); + let lhs = compile_expr(ctx, vars, *lhs); + let rhs = compile_expr(ctx, vars, *rhs); PhysicalExpr::BinOp(op, Box::new(lhs), Box::new(rhs)) } Expr::Var(sym, _ty) => { let var = vars.get_var(sym).cloned().unwrap(); - compile_expr(_ctx, vars, var) + compile_expr(ctx, vars, var) } - Expr::Row(row, ty) => { + Expr::Row(row, _) => { PhysicalExpr::Tuple( row.into_vec() .into_iter() // The `sym` is inline in `expr` - .map(|(_sym, expr)| compile_expr(_ctx, vars, expr)) + .map(|(_sym, expr)| compile_expr(ctx, vars, expr)) .collect(), - ty, ) } - Expr::Lit(value, ty) => PhysicalExpr::Value(value, ty), - Expr::Field(expr, pos, ty) => { - let expr = compile_expr(_ctx, vars, *expr); - PhysicalExpr::Field(Box::new(expr), pos, ty) + Expr::Lit(value, _) => PhysicalExpr::Value(value), + Expr::Field(expr, pos, _) => { + let expr = compile_expr(ctx, vars, *expr); + PhysicalExpr::Field(Box::new(expr), pos) } - Expr::Input(ty) => PhysicalExpr::Input(ty), + Expr::Input(ty) if matches!(*ctx.try_resolve(ty).unwrap(), Type::Var(..)) => PhysicalExpr::Ptr, + Expr::Input(_) => PhysicalExpr::Tup, } } @@ -54,35 +53,24 @@ fn compile_let(ctx: &TyCtx, Let { vars, exprs }: Let) -> Vec { fn compile_filter(ctx: &TyCtx, select: Select) -> PhysicalPlan { let input = compile_rel_expr(ctx, select.input); if let Some(op) = join_exprs(compile_let(ctx, select.expr)) { - PhysicalPlan::Filter(Filter { - input: Box::new(input), - op, - }) + PhysicalPlan::Filter(Box::new(input), op) } else { input } } fn compile_project(ctx: &TyCtx, expr: Project) -> PhysicalPlan { - let proj = plan::Project { - input: Box::new(compile_rel_expr(ctx, expr.input)), - op: join_exprs(compile_let(ctx, expr.expr)).unwrap(), - }; + let input = Box::new(compile_rel_expr(ctx, expr.input)); + let op = join_exprs(compile_let(ctx, expr.expr)).unwrap(); - PhysicalPlan::Project(proj) + PhysicalPlan::Project(input, op) } -fn compile_cross_joins(ctx: &TyCtx, joins: Vec, ty: TyId) -> PhysicalPlan { +fn compile_cross_joins(ctx: &TyCtx, joins: Vec) -> PhysicalPlan { joins .into_iter() .map(|expr| compile_rel_expr(ctx, expr)) - .reduce(|lhs, rhs| { - PhysicalPlan::CrossJoin(CrossJoin { - lhs: Box::new(lhs), - rhs: Box::new(rhs), - ty, - }) - }) + .reduce(|lhs, rhs| PhysicalPlan::NLJoin(Box::new(lhs), Box::new(rhs))) .unwrap() } @@ -91,7 +79,7 @@ fn compile_rel_expr(ctx: &TyCtx, ast: RelExpr) -> PhysicalPlan { RelExpr::RelVar(table, _ty) => PhysicalPlan::TableScan(table), RelExpr::Select(select) => compile_filter(ctx, *select), RelExpr::Proj(proj) => compile_project(ctx, *proj), - RelExpr::Join(joins, ty) => compile_cross_joins(ctx, joins.into_vec(), ty), + RelExpr::Join(joins, _) => compile_cross_joins(ctx, joins.into_vec()), RelExpr::Union(_, _) | RelExpr::Minus(_, _) | RelExpr::Dedup(_) => { unreachable!("DISTINCT is not implemented") } @@ -165,13 +153,39 @@ mod tests { Ok(statement) } + impl PhysicalPlan { + pub fn as_project(&self) -> Option<(&PhysicalPlan, &PhysicalExpr)> { + if let PhysicalPlan::Project(input, expr) = self { + Some((input, expr)) + } else { + None + } + } + + pub fn as_filter(&self) -> Option<(&PhysicalPlan, &PhysicalExpr)> { + if let PhysicalPlan::Filter(input, expr) = self { + Some((input, expr)) + } else { + None + } + } + + pub fn as_nljoin(&self) -> Option<(&PhysicalPlan, &PhysicalPlan)> { + if let PhysicalPlan::NLJoin(lhs, rhs) = self { + Some((lhs, rhs)) + } else { + None + } + } + } + #[test] fn test_project() -> ResultTest<()> { let (ast, ctx) = compile_sql_sub_test("SELECT * FROM t")?; assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::TableScan(_))); let ast = compile_sql_stmt_test("SELECT u32 FROM t")?; - assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Project(_))); + assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Project(..))); Ok(()) } @@ -179,10 +193,10 @@ mod tests { #[test] fn test_select() -> ResultTest<()> { let (ast, ctx) = compile_sql_sub_test("SELECT * FROM t WHERE u32 = 1")?; - assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Filter(_))); + assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Filter(..))); let (ast, ctx) = compile_sql_sub_test("SELECT * FROM t WHERE u32 = 1 AND f32 = f32")?; - assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Filter(_))); + assert!(matches!(compile(&ctx, ast).plan, PhysicalPlan::Filter(..))); Ok(()) } @@ -191,35 +205,35 @@ mod tests { // Check we can do a cross join let (ast, ctx) = compile_sql_sub_test("SELECT t.* FROM t JOIN u")?; let ast = compile(&ctx, ast).plan; - let plan::Project { input, op } = ast.as_project().unwrap(); - let CrossJoin { lhs, rhs, ty: _ } = input.as_cross().unwrap(); + let (input, op) = ast.as_project().unwrap(); + let (lhs, rhs) = input.as_nljoin().unwrap(); - assert!(matches!(op, PhysicalExpr::Field(_, _, _))); - assert!(matches!(&**lhs, PhysicalPlan::TableScan(_))); - assert!(matches!(&**rhs, PhysicalPlan::TableScan(_))); + assert!(matches!(op, PhysicalExpr::Field(..))); + assert!(matches!(lhs, PhysicalPlan::TableScan(_))); + assert!(matches!(rhs, PhysicalPlan::TableScan(_))); // Check we can do multiple joins let (ast, ctx) = compile_sql_sub_test("SELECT t.* FROM t JOIN u JOIN x")?; let ast = compile(&ctx, ast).plan; - let plan::Project { input, op: _ } = ast.as_project().unwrap(); - let CrossJoin { lhs, rhs, ty: _ } = input.as_cross().unwrap(); - assert!(matches!(&**rhs, PhysicalPlan::TableScan(_))); + let (input, _) = ast.as_project().unwrap(); + let (lhs, rhs) = input.as_nljoin().unwrap(); + assert!(matches!(rhs, PhysicalPlan::TableScan(_))); - let CrossJoin { lhs, rhs, ty: _ } = lhs.as_cross().unwrap(); - assert!(matches!(&**lhs, PhysicalPlan::TableScan(_))); - assert!(matches!(&**rhs, PhysicalPlan::TableScan(_))); + let (lhs, rhs) = lhs.as_nljoin().unwrap(); + assert!(matches!(lhs, PhysicalPlan::TableScan(_))); + assert!(matches!(rhs, PhysicalPlan::TableScan(_))); // Check we can do a join with a filter let (ast, ctx) = compile_sql_sub_test("SELECT t.* FROM t JOIN u ON t.u32 = u.u32")?; let ast = compile(&ctx, ast).plan; - let plan::Project { input, op: _ } = ast.as_project().unwrap(); - let Filter { input, op } = input.as_filter().unwrap(); + let (input, _) = ast.as_project().unwrap(); + let (input, op) = input.as_filter().unwrap(); assert!(matches!(op, PhysicalExpr::BinOp(_, _, _))); - let CrossJoin { lhs, rhs, ty: _ } = input.as_cross().unwrap(); - assert!(matches!(&**lhs, PhysicalPlan::TableScan(_))); - assert!(matches!(&**rhs, PhysicalPlan::TableScan(_))); + let (lhs, rhs) = input.as_nljoin().unwrap(); + assert!(matches!(lhs, PhysicalPlan::TableScan(_))); + assert!(matches!(rhs, PhysicalPlan::TableScan(_))); Ok(()) } diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 3703f67dcbe..3545cb80dbe 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -1,61 +1,42 @@ -use spacetimedb_expr::ty::TyId; +use std::{ops::Bound, sync::Arc}; + use spacetimedb_expr::StatementSource; use spacetimedb_lib::AlgebraicValue; use spacetimedb_primitives::{ColId, IndexId}; use spacetimedb_schema::schema::TableSchema; use spacetimedb_sql_parser::ast::BinOp; -use std::{ops::Bound, sync::Arc}; -/// A physical plan is a concrete query evaluation strategy. +/// A physical plan is a concrete evaluation strategy. /// As such, we can reason about its energy consumption. -#[derive(Debug)] +/// +/// Types are encoded in the structure of the plan, +/// rather than made explicit as for the logical plan. pub enum PhysicalPlan { /// Scan a table row by row, returning row ids TableScan(Arc), - /// Fetch and return row ids from an index - IndexScan(IndexScan), - /// Join an input relation with a base table using an index - IndexJoin(IndexJoin), + /// Fetch row ids from an index + IxScan(IxScan), + /// Join a relation to a table using an index + IxJoin(IxJoin), /// An index join + projection - IndexSemiJoin(IndexSemiJoin), - /// Return the cross product of two input relations - CrossJoin(CrossJoin), - /// Filter an input relation row by row - Filter(Filter), - /// Transform an input relation row by row - Project(Project), -} - -#[cfg(test)] -impl PhysicalPlan { - pub fn as_project(&self) -> Option<&Project> { - if let PhysicalPlan::Project(p) = self { - Some(p) - } else { - None - } - } - - pub fn as_filter(&self) -> Option<&Filter> { - if let PhysicalPlan::Filter(p) = self { - Some(p) - } else { - None - } - } - - pub fn as_cross(&self) -> Option<&CrossJoin> { - if let PhysicalPlan::CrossJoin(p) = self { - Some(p) - } else { - None - } - } + IxSemiJoin(IxSemiJoin), + /// A Nested Loop Join. + /// Equivalent to a cross product. + /// + /// 1) If the lhs relation has `n` tuples + /// 2) If the rhs relation has `m` tuples + /// + /// Then a nested loop join returns `n * m` tuples, + /// which is also its asymptotic complexity. + NLJoin(Box, Box), + /// A tuple-at-a-time filter + Filter(Box, PhysicalExpr), + /// A tuple-at-a-time projection + Project(Box, PhysicalExpr), } /// Fetch and return row ids from a btree index -#[derive(Debug)] -pub struct IndexScan { +pub struct IxScan { /// The table on which this index is defined pub table_schema: Arc, /// The index id @@ -74,14 +55,19 @@ pub struct IndexScan { /// BTrees support equality and range scans #[derive(Debug)] pub enum IndexOp { - Eq(AlgebraicValue, TyId), - Range(Bound, Bound, TyId), + Eq(AlgebraicValue), + Range(Bound, Bound), } -/// Join an input relation with a base table using an index. -/// Returns a 2-tuple of its lhs and rhs input rows. -#[derive(Debug)] -pub struct IndexJoin { +/// An index join. +/// Joins a relation to a base table using an index. +/// +/// 1) If the input relation has `n` tuples +/// 2) If the base table has `m` rows +/// 3) If the complexity of an index lookup is f(m) +/// +/// Then the complexity of the index join is `n * f(m)` +pub struct IxJoin { /// The lhs input used to probe the index pub input: Box, /// The rhs indexed table @@ -95,15 +81,12 @@ pub struct IndexJoin { /// It is evaluated over each row from the lhs. /// The resulting value is used to probe the index. pub index_key_expr: PhysicalExpr, - /// The return type of this index join. - /// Always a 2-tuple of its input types. - pub ty: TyId, } -/// An index join + projection. -/// Returns tuples from the lhs (or rhs) exclusively. -#[derive(Debug)] -pub struct IndexSemiJoin { +/// An index semijoin. +/// I.e. an index join + projection. +/// Same asymptotic complexity as [IxJoin]. +pub struct IxSemiJoin { /// The lhs input used to probe the index pub input: Box, /// The rhs indexed table @@ -128,51 +111,29 @@ pub enum SemiJoinProj { Rhs, } -/// Returns the cross product of two input relations. -/// Returns a 2-tuple of its lhs and rhs input rows. -#[derive(Debug)] -pub struct CrossJoin { - /// The lhs input relation - pub lhs: Box, - /// The rhs input relation - pub rhs: Box, - /// The type of this cross product. - /// Always a 2-tuple of its input types. - pub ty: TyId, -} - -/// A streaming or non-leaf filter operation -#[derive(Debug)] -pub struct Filter { - /// A generic filter always has an input - pub input: Box, - /// The boolean expression for selecting tuples - pub op: PhysicalExpr, -} - -/// A streaming project or map operation -#[derive(Debug)] -pub struct Project { - /// A projection always has an input - pub input: Box, - /// The tuple transformation expression. - /// It will always produce another tuple. - pub op: PhysicalExpr, -} - -/// A physical scalar expression -#[derive(Debug)] +/// A physical scalar expression. +/// +/// Types are encoded in the structure of the plan, +/// rather than made explicit as for the logical plan. pub enum PhysicalExpr { /// A binary expression BinOp(BinOp, Box, Box), - /// A tuple expression - Tuple(Vec, TyId), - /// A constant algebraic value - Value(AlgebraicValue, TyId), - /// A field projection expression - Field(Box, usize, TyId), - /// The input tuple to a relop - Input(TyId), + /// A constant algebraic value. + /// Type already encoded in value. + Value(AlgebraicValue), + /// A tuple constructor + Tuple(Vec), + /// A field projection expression. + Field(Box, usize), + /// A pointer to a row in a table. + /// A base element for a field projection. + Ptr, + /// A reference to a product value. + /// A base element for a field projection. + Ref, + /// A temporary tuple value. + /// A base element for a field projection. + Tup, } /// A physical context for the result of a query compilation.