From 3acec8b85e075379c8a13638a01384e2333e4225 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 19 Dec 2024 00:15:42 -0800 Subject: [PATCH] query engine integration --- Cargo.lock | 24 +- Cargo.toml | 2 + crates/bench/Cargo.toml | 2 + crates/bench/benches/subscription.rs | 36 +- crates/core/Cargo.toml | 1 + .../db/datastore/locking_tx_datastore/tx.rs | 13 + crates/core/src/sql/parser.rs | 4 +- crates/execution/Cargo.toml | 4 +- crates/execution/src/iter.rs | 1479 ++++++++++------- crates/execution/src/lib.rs | 115 ++ crates/expr/src/check.rs | 28 +- crates/expr/src/expr.rs | 55 +- crates/expr/src/lib.rs | 12 +- crates/expr/src/statement.rs | 8 +- crates/physical-plan/Cargo.toml | 5 + crates/physical-plan/src/compile.rs | 75 +- crates/physical-plan/src/lib.rs | 1 + crates/physical-plan/src/plan.rs | 899 ++++------ crates/physical-plan/src/rules.rs | 640 +++++++ crates/query/Cargo.toml | 17 + crates/query/src/lib.rs | 29 + crates/table/src/btree_index.rs | 18 +- crates/table/src/table.rs | 29 + 23 files changed, 2219 insertions(+), 1277 deletions(-) create mode 100644 crates/physical-plan/src/rules.rs create mode 100644 crates/query/Cargo.toml create mode 100644 crates/query/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 655f2fe8e04..5c0fb17a4a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4658,9 +4658,11 @@ dependencies = [ "spacetimedb-client-api", "spacetimedb-core", "spacetimedb-data-structures", + "spacetimedb-execution", "spacetimedb-lib", "spacetimedb-paths", "spacetimedb-primitives", + "spacetimedb-query", "spacetimedb-sats", "spacetimedb-schema", "spacetimedb-standalone", @@ -4909,6 +4911,7 @@ dependencies = [ "spacetimedb-commitlog", "spacetimedb-data-structures", "spacetimedb-durability", + "spacetimedb-execution", "spacetimedb-expr", "spacetimedb-jsonwebtoken", "spacetimedb-jwks", @@ -4973,9 +4976,11 @@ dependencies = [ name = "spacetimedb-execution" version = "1.0.0-rc2" dependencies = [ - "spacetimedb-expr", + "anyhow", "spacetimedb-lib", + "spacetimedb-physical-plan", "spacetimedb-primitives", + "spacetimedb-sql-parser", "spacetimedb-table", ] @@ -5089,12 +5094,15 @@ dependencies = [ name = "spacetimedb-physical-plan" version = "1.0.0-rc2" dependencies = [ + "anyhow", "derive_more", + "pretty_assertions", "spacetimedb-expr", "spacetimedb-lib", "spacetimedb-primitives", "spacetimedb-schema", "spacetimedb-sql-parser", + "spacetimedb-table", ] [[package]] @@ -5108,6 +5116,20 @@ dependencies = [ "proptest", ] +[[package]] +name = "spacetimedb-query" +version = "1.0.0-rc2" +dependencies = [ + "anyhow", + "spacetimedb-client-api-messages", + "spacetimedb-execution", + "spacetimedb-expr", + "spacetimedb-physical-plan", + "spacetimedb-primitives", + "spacetimedb-sql-parser", + "spacetimedb-table", +] + [[package]] name = "spacetimedb-quickstart-module" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 7f700269cf2..5536350a535 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "crates/paths", "crates/physical-plan", "crates/primitives", + "crates/query", "crates/sats", "crates/schema", "crates/sdk", @@ -106,6 +107,7 @@ spacetimedb-metrics = { path = "crates/metrics", version = "1.0.0-rc2" } spacetimedb-paths = { path = "crates/paths", version = "1.0.0-rc2" } spacetimedb-physical-plan = { path = "crates/physical-plan", version = "1.0.0-rc2" } spacetimedb-primitives = { path = "crates/primitives", version = "1.0.0-rc2" } +spacetimedb-query = { path = "crates/query", version = "1.0.0-rc2" } spacetimedb-sats = { path = "crates/sats", version = "1.0.0-rc2" } spacetimedb-schema = { path = "crates/schema", version = "1.0.0-rc2" } spacetimedb-standalone = { path = "crates/standalone", version = "1.0.0-rc2" } diff --git a/crates/bench/Cargo.toml b/crates/bench/Cargo.toml index 53bc3f3298e..b606bef652f 100644 --- a/crates/bench/Cargo.toml +++ b/crates/bench/Cargo.toml @@ -31,9 +31,11 @@ bench = false spacetimedb-client-api = { path = "../client-api" } spacetimedb-core = { path = "../core", features = ["test"] } spacetimedb-data-structures.workspace = true +spacetimedb-execution = { path = "../execution" } spacetimedb-lib = { path = "../lib" } spacetimedb-paths.workspace = true spacetimedb-primitives = { path = "../primitives" } +spacetimedb-query = { path = "../query" } spacetimedb-sats = { path = "../sats" } spacetimedb-schema = { workspace = true, features = ["test"] } spacetimedb-standalone = { path = "../standalone" } diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs index a24a6864478..e0f21ec1057 100644 --- a/crates/bench/benches/subscription.rs +++ b/crates/bench/benches/subscription.rs @@ -4,12 +4,14 @@ use spacetimedb::execution_context::Workload; use spacetimedb::host::module_host::DatabaseTableUpdate; use spacetimedb::identity::AuthCtx; use spacetimedb::messages::websocket::BsatnFormat; +use spacetimedb::sql::ast::SchemaViewer; use spacetimedb::subscription::query::compile_read_only_query; use spacetimedb::subscription::subscription::ExecutionSet; use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression}; use spacetimedb_bench::database::BenchDatabase as _; use spacetimedb_bench::spacetime_raw::SpacetimeRaw; use spacetimedb_primitives::{col_list, TableId}; +use spacetimedb_query::SubscribePlan; use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductValue}; fn create_table_location(db: &RelationalDB) -> Result { @@ -99,6 +101,18 @@ fn eval(c: &mut Criterion) { let ins_rhs = insert_op(rhs, "location", new_rhs_row); let update = [&ins_lhs, &ins_rhs]; + // A benchmark runner for the new query engine + let bench_query = |c: &mut Criterion, name, sql| { + c.bench_function(name, |b| { + let tx = raw.db.begin_tx(Workload::Subscribe); + let auth = AuthCtx::for_testing(); + let schema_viewer = &SchemaViewer::new(&raw.db, &tx, &auth); + let plan = SubscribePlan::compile(sql, schema_viewer).unwrap(); + + b.iter(|| drop(black_box(plan.execute_bsatn(&tx)))) + }); + }; + let bench_eval = |c: &mut Criterion, name, sql| { c.bench_function(name, |b| { let tx = raw.db.begin_tx(Workload::Update); @@ -116,22 +130,26 @@ fn eval(c: &mut Criterion) { }); }; - // To profile this benchmark for 30s - // samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-scan --exact --profile-time=30 - // Iterate 1M rows. - bench_eval(c, "full-scan", "select * from footprint"); - - // To profile this benchmark for 30s - // samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-join --exact --profile-time=30 // Join 1M rows on the left with 12K rows on the right. - // Note, this should use an index join so as not to read the entire lhs table. + // Note, this should use an index join so as not to read the entire footprint table. let name = format!( r#" select footprint.* - from footprint join location on footprint.entity_id = location.entity_id + from location join footproint on location.entity_id = footprint.entity_id where location.chunk_index = {chunk_index} "# ); + + bench_query(c, "footprint-scan", "select * from footprint"); + bench_query(c, "footprint-semijoin", &name); + + // To profile this benchmark for 30s + // samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-scan --exact --profile-time=30 + // Iterate 1M rows. + bench_eval(c, "full-scan", "select * from footprint"); + + // To profile this benchmark for 30s + // samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-join --exact --profile-time=30 bench_eval(c, "full-join", &name); // To profile this benchmark for 30s diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 0baa3eae7a8..da011b18319 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -29,6 +29,7 @@ spacetimedb-table.workspace = true spacetimedb-vm.workspace = true spacetimedb-snapshot.workspace = true spacetimedb-expr.workspace = true +spacetimedb-execution.workspace = true anyhow = { workspace = true, features = ["backtrace"] } arrayvec.workspace = true diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs index b9be047a55c..bbc2cc1b68a 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs @@ -6,9 +6,12 @@ use super::{ SharedReadGuard, }; use crate::execution_context::ExecutionContext; +use spacetimedb_execution::Datastore; use spacetimedb_primitives::{ColList, TableId}; use spacetimedb_sats::AlgebraicValue; use spacetimedb_schema::schema::TableSchema; +use spacetimedb_table::blob_store::BlobStore; +use spacetimedb_table::table::Table; use std::num::NonZeroU64; use std::sync::Arc; use std::{ @@ -23,6 +26,16 @@ pub struct TxId { pub(crate) ctx: ExecutionContext, } +impl Datastore for TxId { + fn blob_store(&self) -> &dyn BlobStore { + &self.committed_state_shared_lock.blob_store + } + + fn table(&self, table_id: TableId) -> Option<&Table> { + self.committed_state_shared_lock.get_table(table_id) + } +} + impl StateView for TxId { fn get_schema(&self, table_id: TableId) -> Option<&Arc> { self.committed_state_shared_lock.get_schema(table_id) diff --git a/crates/core/src/sql/parser.rs b/crates/core/src/sql/parser.rs index b02485aef1b..b6fa8ffdbb6 100644 --- a/crates/core/src/sql/parser.rs +++ b/crates/core/src/sql/parser.rs @@ -3,13 +3,13 @@ use crate::db::relational_db::RelationalDB; use crate::sql::ast::SchemaViewer; use spacetimedb_expr::check::parse_and_type_sub; use spacetimedb_expr::errors::TypingError; -use spacetimedb_expr::expr::Project; +use spacetimedb_expr::expr::ProjectName; use spacetimedb_lib::db::raw_def::v9::RawRowLevelSecurityDefV9; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_schema::schema::RowLevelSecuritySchema; pub struct RowLevelExpr { - pub sql: Project, + pub sql: ProjectName, pub def: RowLevelSecuritySchema, } diff --git a/crates/execution/Cargo.toml b/crates/execution/Cargo.toml index 09ec6d5f1dc..10c4aa6b2d4 100644 --- a/crates/execution/Cargo.toml +++ b/crates/execution/Cargo.toml @@ -7,7 +7,9 @@ license-file = "LICENSE" description = "The SpacetimeDB query engine" [dependencies] -spacetimedb-expr.workspace = true +anyhow.workspace = true spacetimedb-lib.workspace = true +spacetimedb-physical-plan.workspace = true spacetimedb-primitives.workspace = true +spacetimedb-sql-parser.workspace = true spacetimedb-table.workspace = true diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index 8db3e2c9c7c..829e4e3ebf1 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -1,490 +1,524 @@ -use std::ops::{Bound, RangeBounds}; +use std::collections::{HashMap, HashSet}; +use anyhow::{anyhow, bail, Context, Result}; use spacetimedb_lib::{AlgebraicValue, ProductValue}; -use spacetimedb_primitives::{IndexId, TableId}; +use spacetimedb_physical_plan::plan::{ + HashJoin, IxJoin, IxScan, PhysicalExpr, PhysicalPlan, ProjectField, ProjectPlan, Sarg, Semi, TupleField, +}; use spacetimedb_table::{ blob_store::BlobStore, btree_index::{BTreeIndex, BTreeIndexRangeIter}, - static_assert_size, table::{IndexScanIter, RowRef, Table, TableScanIter}, }; -/// A row from a base table in the form of a pointer or product value -#[derive(Clone)] -pub enum Row<'a> { - Ptr(RowRef<'a>), - Ref(&'a ProductValue), -} +use crate::{Datastore, FallibleDatastore, Tuple}; -impl Row<'_> { - /// Expect a pointer value, panic otherwise - pub fn expect_ptr(&self) -> &RowRef { - match self { - Self::Ptr(ptr) => ptr, - _ => unreachable!(), - } - } +/// The different iterators for evaluating query plans +pub enum PlanIter<'a> { + Table(TableScanIter<'a>), + Index(IndexScanIter<'a>), + RowId(RowRefIter<'a>), + Tuple(ProjectIter<'a>), +} - /// Expect a product value, panic otherwise - pub fn expect_ref(&self) -> &ProductValue { - match self { - Self::Ref(r) => r, - _ => unreachable!(), - } +impl<'a> PlanIter<'a> { + pub(crate) fn build(plan: &'a ProjectPlan, tx: &'a FallibleDatastore<'a, T>) -> Result { + ProjectIter::build(plan, tx).map(|iter| match iter { + ProjectIter::None(Iter::Row(RowRefIter::TableScan(iter))) => Self::Table(iter), + ProjectIter::None(Iter::Row(RowRefIter::IndexScan(iter))) => Self::Index(iter), + ProjectIter::None(Iter::Row(iter)) => Self::RowId(iter), + _ => Self::Tuple(iter), + }) } } -static_assert_size!(Row, 32); - -/// A tuple returned by a query iterator -#[derive(Clone)] -pub enum Tuple<'a> { - /// A row from a base table - Row(Row<'a>), - /// A temporary constructed by a query operator - Join(Vec>), +/// Implements a tuple projection for a query plan +pub enum ProjectIter<'a> { + None(Iter<'a>), + Some(Iter<'a>, usize), } -static_assert_size!(Tuple, 40); +impl<'a> Iterator for ProjectIter<'a> { + type Item = RowRef<'a>; -impl Tuple<'_> { - /// Expect a row from a base table, panic otherwise - pub fn expect_row(&self) -> &Row { + fn next(&mut self) -> Option { match self { - Self::Row(row) => row, - _ => unreachable!(), + Self::None(iter) => iter.find_map(|tuple| { + if let Tuple::Row(ptr) = tuple { + return Some(ptr); + } + None + }), + Self::Some(iter, i) => iter.find_map(|tuple| tuple.select(*i)), } } +} - /// Expect a temporary tuple, panic otherwise - pub fn expect_join(&self) -> &[Row] { - match self { - Self::Join(elems) => elems.as_slice(), - _ => unreachable!(), +impl<'a> ProjectIter<'a> { + fn build(plan: &'a ProjectPlan, tx: &'a FallibleDatastore<'a, T>) -> Result { + match plan { + ProjectPlan::None(plan) | ProjectPlan::Name(plan, _, None) => Iter::build(plan, tx).map(Self::None), + ProjectPlan::Name(plan, _, Some(i)) => Iter::build(plan, tx).map(|iter| Self::Some(iter, *i)), } } } -/// An execution plan for a tuple-at-a-time iterator. -/// As the name suggests it is meant to be cached. -/// Building the iterator should incur minimal overhead. -pub struct CachedIterPlan { - /// The relational ops - iter_ops: Box<[IterOp]>, - /// The expression ops - expr_ops: Box<[OpCode]>, - /// The constants referenced by the plan - constants: Box<[AlgebraicValue]>, -} - -static_assert_size!(CachedIterPlan, 48); - -impl CachedIterPlan { - /// Returns an interator over the query ops - fn ops(&self) -> impl Iterator + '_ { - self.iter_ops.iter().copied() - } - - /// Lookup a constant in the plan - fn constant(&self, i: u16) -> &AlgebraicValue { - &self.constants[i as usize] - } -} - -/// An opcode for a tuple-at-a-time execution plan -#[derive(Clone, Copy)] -pub enum IterOp { - /// A table scan opcode takes 1 arg: A [TableId] - TableScan(TableId), - /// A delta scan opcode takes 1 arg: A [TableId] - DeltaScan(TableId), - /// An index scan opcode takes 2 args: - /// 1. An [IndexId] - /// 2. A ptr to an [AlgebraicValue] - IxScanEq(IndexId, u16), - /// An index range scan opcode takes 3 args: - /// 1. An [IndexId] - /// 2. A ptr to the lower bound - /// 3. A ptr to the upper bound - IxScanRange(IndexId, Bound, Bound), - /// Pops its 2 args from the stack - NLJoin, - /// An index join opcode takes 2 args: - /// 1. An [IndexId] - /// 2. An instruction ptr - /// 3. A length - IxJoin(IndexId, usize, u16), - /// An index join opcode takes 2 args: - /// 1. An [IndexId] - /// 2. An instruction ptr - /// 3. A length - UniqueIxJoin(IndexId, usize, u16), - /// A filter opcode takes 2 args: - /// 1. An instruction ptr - /// 2. A length - Filter(usize, u32), -} - -static_assert_size!(IterOp, 16); - -pub trait Datastore { - fn delta_scan_iter(&self, table_id: TableId) -> DeltaScanIter; - fn table_scan_iter(&self, table_id: TableId) -> TableScanIter; - fn index_scan_iter(&self, index_id: IndexId, range: &impl RangeBounds) -> IndexScanIter; - fn get_table_for_index(&self, index_id: &IndexId) -> &Table; - fn get_index(&self, index_id: &IndexId) -> &BTreeIndex; - fn get_blob_store(&self) -> &dyn BlobStore; -} - -/// An iterator for a delta table -pub struct DeltaScanIter<'a> { - iter: std::slice::Iter<'a, ProductValue>, -} - -impl<'a> Iterator for DeltaScanIter<'a> { - type Item = &'a ProductValue; +/// A generic tuple-at-a-time iterator for a query plan +pub enum Iter<'a> { + Row(RowRefIter<'a>), + Join(LeftDeepJoinIter<'a>), + Filter(Filter<'a, Iter<'a>>), +} + +impl<'a> Iterator for Iter<'a> { + type Item = Tuple<'a>; fn next(&mut self) -> Option { - self.iter.next() + match self { + Self::Row(iter) => iter.next().map(Tuple::Row), + Self::Join(iter) => iter.next(), + Self::Filter(iter) => iter.next(), + } } } -impl CachedIterPlan { - pub fn iter<'a>(&'a self, tx: &'a impl Datastore) -> Iter<'a> { - let mut stack = vec![]; - for op in self.ops() { - match op { - IterOp::TableScan(table_id) => { - // Push table scan - stack.push(Iter::TableScan(tx.table_scan_iter(table_id))); - } - IterOp::DeltaScan(table_id) => { - // Push delta scan - stack.push(Iter::DeltaScan(tx.delta_scan_iter(table_id))); - } - IterOp::IxScanEq(index_id, ptr) => { - // Push index scan - stack.push(Iter::IndexScan(tx.index_scan_iter(index_id, &self.constant(ptr)))); - } - IterOp::IxScanRange(index_id, lower, upper) => { - // Push range scan - let lower = lower.map(|ptr| self.constant(ptr)); - let upper = upper.map(|ptr| self.constant(ptr)); - stack.push(Iter::IndexScan(tx.index_scan_iter(index_id, &(lower, upper)))); - } - IterOp::NLJoin => { - // Pop args and push nested loop join - let rhs = stack.pop().unwrap(); - let lhs = stack.pop().unwrap(); - stack.push(Iter::NLJoin(NestedLoopJoin::new(lhs, rhs))); - } - IterOp::IxJoin(index_id, i, n) => { - // Pop arg and push index join - let input = stack.pop().unwrap(); - let index = tx.get_index(&index_id); - let table = tx.get_table_for_index(&index_id); - let blob_store = tx.get_blob_store(); - let ops = &self.expr_ops[i..i + n as usize]; - let program = ExprProgram::new(ops, &self.constants); - let projection = ProgramEvaluator::from(program); - stack.push(Iter::IxJoin(LeftDeepJoin::Eq(IndexJoin::new( - input, index, table, blob_store, projection, - )))); - } - IterOp::UniqueIxJoin(index_id, i, n) => { - // Pop arg and push index join - let input = stack.pop().unwrap(); - let index = tx.get_index(&index_id); - let table = tx.get_table_for_index(&index_id); - let blob_store = tx.get_blob_store(); - let ops = &self.expr_ops[i..i + n as usize]; - let program = ExprProgram::new(ops, &self.constants); - let projection = ProgramEvaluator::from(program); - stack.push(Iter::UniqueIxJoin(LeftDeepJoin::Eq(UniqueIndexJoin::new( - input, index, table, blob_store, projection, - )))); - } - IterOp::Filter(i, n) => { - // Pop arg and push filter - let input = Box::new(stack.pop().unwrap()); - let ops = &self.expr_ops[i..i + n as usize]; - let program = ExprProgram::new(ops, &self.constants); - let program = ProgramEvaluator::from(program); - stack.push(Iter::Filter(Filter { input, program })); - } +impl<'a> Iter<'a> { + fn build(plan: &'a PhysicalPlan, tx: &'a FallibleDatastore<'a, T>) -> Result { + match plan { + PhysicalPlan::TableScan(..) | PhysicalPlan::IxScan(..) => RowRefIter::build(plan, tx).map(Self::Row), + PhysicalPlan::Filter(input, expr) => { + // Build a filter iterator + Iter::build(input, tx) + .map(Box::new) + .map(|input| Filter { input, expr }) + .map(Iter::Filter) + } + PhysicalPlan::NLJoin(lhs, rhs) => { + // Build a nested loop join iterator + NLJoin::build_from(lhs, rhs, tx) + .map(LeftDeepJoinIter::NLJoin) + .map(Iter::Join) + } + PhysicalPlan::IxJoin(join @ IxJoin { unique: false, .. }, Semi::Lhs) => { + // Build a left index semijoin iterator + IxJoinLhs::build_from(join, tx) + .map(SemiJoin::Lhs) + .map(LeftDeepJoinIter::IxJoin) + .map(Iter::Join) + } + PhysicalPlan::IxJoin(join @ IxJoin { unique: false, .. }, Semi::Rhs) => { + // Build a right index semijoin iterator + IxJoinRhs::build_from(join, tx) + .map(SemiJoin::Rhs) + .map(LeftDeepJoinIter::IxJoin) + .map(Iter::Join) + } + PhysicalPlan::IxJoin(join @ IxJoin { unique: false, .. }, Semi::All) => { + // Build an index join iterator + IxJoinIter::build_from(join, tx) + .map(SemiJoin::All) + .map(LeftDeepJoinIter::IxJoin) + .map(Iter::Join) + } + PhysicalPlan::IxJoin(join @ IxJoin { unique: true, .. }, Semi::Lhs) => { + // Build a unique left index semijoin iterator + UniqueIxJoinLhs::build_from(join, tx) + .map(SemiJoin::Lhs) + .map(LeftDeepJoinIter::UniqueIxJoin) + .map(Iter::Join) + } + PhysicalPlan::IxJoin(join @ IxJoin { unique: true, .. }, Semi::Rhs) => { + // Build a unique right index semijoin iterator + UniqueIxJoinRhs::build_from(join, tx) + .map(SemiJoin::Rhs) + .map(LeftDeepJoinIter::UniqueIxJoin) + .map(Iter::Join) + } + PhysicalPlan::IxJoin(join @ IxJoin { unique: true, .. }, Semi::All) => { + // Build a unique index join iterator + UniqueIxJoin::build_from(join, tx) + .map(SemiJoin::All) + .map(LeftDeepJoinIter::UniqueIxJoin) + .map(Iter::Join) + } + PhysicalPlan::HashJoin(join @ HashJoin { unique: false, .. }, Semi::Lhs) => { + // Build a left hash semijoin iterator + HashJoinLhs::build_from(join, tx) + .map(SemiJoin::Lhs) + .map(LeftDeepJoinIter::HashJoin) + .map(Iter::Join) + } + PhysicalPlan::HashJoin(join @ HashJoin { unique: false, .. }, Semi::Rhs) => { + // Build a right hash semijoin iterator + HashJoinRhs::build_from(join, tx) + .map(SemiJoin::Rhs) + .map(LeftDeepJoinIter::HashJoin) + .map(Iter::Join) + } + PhysicalPlan::HashJoin(join @ HashJoin { unique: false, .. }, Semi::All) => { + // Build a hash join iterator + HashJoinIter::build_from(join, tx) + .map(SemiJoin::All) + .map(LeftDeepJoinIter::HashJoin) + .map(Iter::Join) + } + PhysicalPlan::HashJoin(join @ HashJoin { unique: true, .. }, Semi::Lhs) => { + // Build a unique left hash semijoin iterator + UniqueHashJoinLhs::build_from(join, tx) + .map(SemiJoin::Lhs) + .map(LeftDeepJoinIter::UniqueHashJoin) + .map(Iter::Join) + } + PhysicalPlan::HashJoin(join @ HashJoin { unique: true, .. }, Semi::Rhs) => { + // Build a unique right hash semijoin iterator + UniqueHashJoinRhs::build_from(join, tx) + .map(SemiJoin::Rhs) + .map(LeftDeepJoinIter::UniqueHashJoin) + .map(Iter::Join) + } + PhysicalPlan::HashJoin(join @ HashJoin { unique: true, .. }, Semi::All) => { + // Build a unique hash join iterator + UniqueHashJoin::build_from(join, tx) + .map(SemiJoin::All) + .map(LeftDeepJoinIter::UniqueHashJoin) + .map(Iter::Join) } } - stack.pop().unwrap() } } -/// A tuple-at-a-time query iterator. -/// Notice there is no explicit projection operation. -/// This is because for applicable plans, -/// the optimizer can remove intermediate projections, -/// implementing a form of late materialization. -pub enum Iter<'a> { - /// A [RowRef] table iterator +/// An iterator that always returns [RowRef]s +pub enum RowRefIter<'a> { TableScan(TableScanIter<'a>), - /// A [ProductValue] ref iterator - DeltaScan(DeltaScanIter<'a>), - /// A [RowRef] index iterator IndexScan(IndexScanIter<'a>), - /// A nested loop join iterator - NLJoin(NestedLoopJoin<'a>), - /// A non-unique (constraint) index join iterator - IxJoin(LeftDeepJoin>), - /// A unique (constraint) index join iterator - UniqueIxJoin(LeftDeepJoin>), - /// A tuple-at-a-time filter iterator - Filter(Filter<'a>), + RowFilter(Filter<'a, RowRefIter<'a>>), } -impl<'a> Iterator for Iter<'a> { +impl<'a> Iterator for RowRefIter<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + match self { + Self::TableScan(iter) => iter.next(), + Self::IndexScan(iter) => iter.next(), + Self::RowFilter(iter) => iter.next(), + } + } +} + +impl<'a> RowRefIter<'a> { + /// Instantiate an iterator from a [PhysicalPlan]. + /// The compiler ensures this isn't called on a join. + fn build(plan: &'a PhysicalPlan, tx: &'a FallibleDatastore<'a, T>) -> Result { + let concat = |prefix: &[(_, AlgebraicValue)], v| { + ProductValue::from_iter(prefix.iter().map(|(_, v)| v).chain([v]).cloned()) + }; + match plan { + PhysicalPlan::TableScan(schema, _) => tx.table_scan(schema.table_id).map(Self::TableScan), + PhysicalPlan::IxScan( + scan @ IxScan { + arg: Sarg::Eq(_, v), .. + }, + _, + ) if scan.prefix.is_empty() => tx + .index_scan(scan.schema.table_id, scan.index_id, v) + .map(Self::IndexScan), + PhysicalPlan::IxScan( + scan @ IxScan { + arg: Sarg::Eq(_, v), .. + }, + _, + ) => tx + .index_scan( + scan.schema.table_id, + scan.index_id, + &AlgebraicValue::product(concat(&scan.prefix, v)), + ) + .map(Self::IndexScan), + PhysicalPlan::IxScan( + scan @ IxScan { + arg: Sarg::Range(_, lower, upper), + .. + }, + _, + ) if scan.prefix.is_empty() => tx + .index_scan(scan.schema.table_id, scan.index_id, &(lower.as_ref(), upper.as_ref())) + .map(Self::IndexScan), + PhysicalPlan::IxScan( + scan @ IxScan { + arg: Sarg::Range(_, lower, upper), + .. + }, + _, + ) => tx + .index_scan( + scan.schema.table_id, + scan.index_id, + &( + lower + .as_ref() + .map(|v| concat(&scan.prefix, v)) + .map(AlgebraicValue::Product), + upper + .as_ref() + .map(|v| concat(&scan.prefix, v)) + .map(AlgebraicValue::Product), + ), + ) + .map(Self::IndexScan), + PhysicalPlan::Filter(input, expr) => Self::build(input, tx) + .map(Box::new) + .map(|input| Filter { input, expr }) + .map(Self::RowFilter), + _ => bail!("Plan does not return row ids"), + } + } +} + +/// An iterator for a left deep join tree. +/// +/// ```text +/// x +/// / \ +/// x c +/// / \ +/// a b +/// ``` +pub enum LeftDeepJoinIter<'a> { + /// A nested loop join + NLJoin(NLJoin<'a>), + /// An index join + IxJoin(SemiJoin, IxJoinLhs<'a>, IxJoinRhs<'a>>), + /// An index join for a unique constraint + UniqueIxJoin(SemiJoin, UniqueIxJoinLhs<'a>, UniqueIxJoinRhs<'a>>), + /// A hash join + HashJoin(SemiJoin, HashJoinLhs<'a>, HashJoinRhs<'a>>), + /// A hash join for a unique constraint + UniqueHashJoin(SemiJoin, UniqueHashJoinLhs<'a>, UniqueHashJoinRhs<'a>>), +} + +impl<'a> Iterator for LeftDeepJoinIter<'a> { type Item = Tuple<'a>; fn next(&mut self) -> Option { match self { - Self::TableScan(iter) => { - // Returns row ids - iter.next().map(Row::Ptr).map(Tuple::Row) - } - Self::DeltaScan(iter) => { - // Returns product refs - iter.next().map(Row::Ref).map(Tuple::Row) - } - Self::IndexScan(iter) => { - // Returns row ids - iter.next().map(Row::Ptr).map(Tuple::Row) - } - Self::IxJoin(iter) => { - // Returns row ids for semijoins, (n+1)-tuples otherwise - iter.next() - } - Self::UniqueIxJoin(iter) => { - // Returns row ids for semijoins, (n+1)-tuples otherwise - iter.next() - } - Self::Filter(iter) => { - // Filter is a passthru - iter.next() - } - Self::NLJoin(iter) => { - iter.next().map(|t| { - match t { - // A leaf join - // x - // / \ - // a b - (Tuple::Row(u), Tuple::Row(v)) => { - // Returns a 2-tuple - Tuple::Join(vec![u, v]) - } - // A right deep join - // x - // / \ - // a x - // / \ - // b c - (Tuple::Row(r), Tuple::Join(mut rows)) => { - // Returns an (n+1)-tuple - let mut pointers = vec![r]; - pointers.append(&mut rows); - Tuple::Join(pointers) - } - // A left deep join - // x - // / \ - // x c - // / \ - // a b - (Tuple::Join(mut rows), Tuple::Row(r)) => { - // Returns an (n+1)-tuple - rows.push(r); - Tuple::Join(rows) - } - // A bushy join - // x - // / \ - // / \ - // x x - // / \ / \ - // a b c d - (Tuple::Join(mut lhs), Tuple::Join(mut rhs)) => { - // Returns an (n+m)-tuple - lhs.append(&mut rhs); - Tuple::Join(lhs) - } - } - }) - } + Self::NLJoin(iter) => iter.next().map(|(tuple, rhs)| tuple.append(rhs)), + Self::IxJoin(iter) => iter.next(), + Self::UniqueIxJoin(iter) => iter.next(), + Self::HashJoin(iter) => iter.next(), + Self::UniqueHashJoin(iter) => iter.next(), } } } -/// An iterator for a left deep join tree -pub enum LeftDeepJoin { - /// A standard join - Eq(Iter), - /// A semijoin that returns the lhs - SemiLhs(Iter), - /// A semijion that returns the rhs - SemiRhs(Iter), +/// A semijoin iterator. +/// Returns [RowRef]s for right semijoins. +/// Returns [Tuple]s otherwise. +pub enum SemiJoin { + All(All), + Lhs(Lhs), + Rhs(Rhs), } -impl<'a, Iter> Iterator for LeftDeepJoin +impl<'a, All, Lhs, Rhs> Iterator for SemiJoin where - Iter: Iterator, RowRef<'a>)>, + All: Iterator, RowRef<'a>)>, + Lhs: Iterator>, + Rhs: Iterator>, { type Item = Tuple<'a>; fn next(&mut self) -> Option { match self { - Self::SemiLhs(iter) => { - // Return the lhs tuple - iter.next().map(|(t, _)| t) - } - Self::SemiRhs(iter) => { - // Return the rhs row - iter.next().map(|(_, ptr)| ptr).map(Row::Ptr).map(Tuple::Row) - } - Self::Eq(iter) => { - iter.next().map(|(tuple, ptr)| { - match (tuple, ptr) { - // A leaf join - // x - // / \ - // a b - (Tuple::Row(u), ptr) => { - // Returns a 2-tuple - Tuple::Join(vec![u, Row::Ptr(ptr)]) - } - // A left deep join - // x - // / \ - // x c - // / \ - // a b - (Tuple::Join(mut rows), ptr) => { - // Returns an (n+1)-tuple - rows.push(Row::Ptr(ptr)); - Tuple::Join(rows) - } - } - }) - } + Self::All(iter) => iter.next().map(|(tuple, ptr)| tuple.append(ptr)), + Self::Lhs(iter) => iter.next(), + Self::Rhs(iter) => iter.next().map(Tuple::Row), } } } -/// A unique (constraint) index join iterator -pub struct UniqueIndexJoin<'a> { +/// An index join that uses a unique constraint index +pub struct UniqueIxJoin<'a> { /// The lhs of the join - input: Box>, + lhs: Box>, /// The rhs index - index: &'a BTreeIndex, + rhs_index: &'a BTreeIndex, /// A handle to the datastore - table: &'a Table, + rhs_table: &'a Table, /// A handle to the blobstore blob_store: &'a dyn BlobStore, - /// The lhs index key projection - projection: ProgramEvaluator<'a>, -} - -impl<'a> UniqueIndexJoin<'a> { - fn new( - input: Iter<'a>, - index: &'a BTreeIndex, - table: &'a Table, - blob_store: &'a dyn BlobStore, - projection: ProgramEvaluator<'a>, - ) -> Self { - Self { - input: Box::new(input), - index, - table, - blob_store, - projection, - } + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> UniqueIxJoin<'a> { + fn build_from(join: &'a IxJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_table = tx.table(join.rhs.table_id)?; + let rhs_index = rhs_table + .get_index(join.rhs_index) + .ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?; + Ok(Self { + lhs: Box::new(lhs), + rhs_index, + rhs_table, + blob_store: tx.blob_store(), + lhs_field: &join.lhs_field, + }) } } -impl<'a> Iterator for UniqueIndexJoin<'a> { +impl<'a> Iterator for UniqueIxJoin<'a> { type Item = (Tuple<'a>, RowRef<'a>); fn next(&mut self) -> Option { - self.input.find_map(|tuple| { - self.index - .seek(&self.projection.eval(&tuple)) + self.lhs.find_map(|tuple| { + self.rhs_index + .seek(&tuple.project(self.lhs_field)) .next() - .and_then(|ptr| self.table.get_row_ref(self.blob_store, ptr)) + .and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr)) .map(|ptr| (tuple, ptr)) }) } } -/// A non-unique (constraint) index join iterator -pub struct IndexJoin<'a> { +/// A left semijoin that uses a unique constraint index +pub struct UniqueIxJoinLhs<'a> { + /// The lhs of the join + lhs: Box>, + /// The rhs index + rhs: &'a BTreeIndex, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> UniqueIxJoinLhs<'a> { + fn build_from(join: &'a IxJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_table = tx.table(join.rhs.table_id)?; + let rhs_index = rhs_table + .get_index(join.rhs_index) + .ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?; + Ok(Self { + lhs: Box::new(lhs), + rhs: rhs_index, + lhs_field: &join.lhs_field, + }) + } +} + +impl<'a> Iterator for UniqueIxJoinLhs<'a> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + self.lhs.find(|t| self.rhs.contains_any(&t.project(self.lhs_field))) + } +} + +/// A right semijoin that uses a unique constraint index +pub struct UniqueIxJoinRhs<'a> { /// The lhs of the join - input: Box>, - /// The current tuple from the lhs - tuple: Option>, + lhs: Box>, /// The rhs index - index: &'a BTreeIndex, - /// The current cursor for the rhs index - index_cursor: Option>, + rhs_index: &'a BTreeIndex, /// A handle to the datastore - table: &'a Table, + rhs_table: &'a Table, /// A handle to the blobstore blob_store: &'a dyn BlobStore, - /// The lhs index key projection - projection: ProgramEvaluator<'a>, -} - -impl<'a> IndexJoin<'a> { - fn new( - input: Iter<'a>, - index: &'a BTreeIndex, - table: &'a Table, - blob_store: &'a dyn BlobStore, - projection: ProgramEvaluator<'a>, - ) -> Self { - Self { - input: Box::new(input), - tuple: None, - index, - index_cursor: None, - table, - blob_store, - projection, - } + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> UniqueIxJoinRhs<'a> { + fn build_from(join: &'a IxJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_table = tx.table(join.rhs.table_id)?; + let rhs_index = rhs_table + .get_index(join.rhs_index) + .ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?; + Ok(Self { + lhs: Box::new(lhs), + rhs_index, + rhs_table, + blob_store: tx.blob_store(), + lhs_field: &join.lhs_field, + }) + } +} + +impl<'a> Iterator for UniqueIxJoinRhs<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + self.lhs.find_map(|tuple| { + self.rhs_index + .seek(&tuple.project(self.lhs_field)) + .next() + .and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr)) + }) + } +} + +/// An index join that does not use a unique constraint index +pub struct IxJoinIter<'a> { + /// The lhs of the join + lhs: Box>, + /// The current lhs tuple + lhs_tuple: Option>, + /// The rhs index + rhs_index: &'a BTreeIndex, + /// The current rhs index cursor + rhs_index_cursor: Option>, + /// A handle to the datastore + rhs_table: &'a Table, + /// A handle to the blobstore + blob_store: &'a dyn BlobStore, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> IxJoinIter<'a> { + fn build_from(join: &'a IxJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_table = tx.table(join.rhs.table_id)?; + let rhs_index = rhs_table + .get_index(join.rhs_index) + .ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?; + Ok(Self { + lhs: Box::new(lhs), + lhs_tuple: None, + rhs_index, + rhs_index_cursor: None, + rhs_table, + blob_store: tx.blob_store(), + lhs_field: &join.lhs_field, + }) } } -impl<'a> Iterator for IndexJoin<'a> { +impl<'a> Iterator for IxJoinIter<'a> { type Item = (Tuple<'a>, RowRef<'a>); fn next(&mut self) -> Option { - self.tuple + self.lhs_tuple .as_ref() .and_then(|tuple| { - self.index_cursor.as_mut().and_then(|cursor| { + self.rhs_index_cursor.as_mut().and_then(|cursor| { cursor.next().and_then(|ptr| { - self.table + self.rhs_table .get_row_ref(self.blob_store, ptr) .map(|ptr| (tuple.clone(), ptr)) }) }) }) .or_else(|| { - self.input.find_map(|tuple| { - Some(self.index.seek(&self.projection.eval(&tuple))).and_then(|mut cursor| { - cursor.next().and_then(|ptr| { - self.table.get_row_ref(self.blob_store, ptr).map(|ptr| { - self.tuple = Some(tuple.clone()); - self.index_cursor = Some(cursor); - (tuple, ptr) - }) + self.lhs.find_map(|tuple| { + let mut cursor = self.rhs_index.seek(&tuple.project(self.lhs_field)); + cursor.next().and_then(|ptr| { + self.rhs_table.get_row_ref(self.blob_store, ptr).map(|ptr| { + self.lhs_tuple = Some(tuple.clone()); + self.rhs_index_cursor = Some(cursor); + (tuple, ptr) }) }) }) @@ -492,256 +526,495 @@ impl<'a> Iterator for IndexJoin<'a> { } } -/// A nested loop join returns the cross product of its inputs -pub struct NestedLoopJoin<'a> { - /// The lhs input +/// A left semijoin that does not use a unique constraint index +pub struct IxJoinLhs<'a> { + /// The lhs of the join lhs: Box>, - /// The rhs input - rhs: Box>, - /// The materialized rhs - build: Vec>, + /// The rhs index + rhs_index: &'a BTreeIndex, /// The current lhs tuple - lhs_row: Option>, - /// The current rhs tuple - rhs_ptr: usize, -} - -impl<'a> NestedLoopJoin<'a> { - fn new(lhs: Iter<'a>, rhs: Iter<'a>) -> Self { - Self { + lhs_tuple: Option>, + /// The matching rhs row count + rhs_count: usize, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> IxJoinLhs<'a> { + fn build_from(join: &'a IxJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_table = tx.table(join.rhs.table_id)?; + let rhs_index = rhs_table + .get_index(join.rhs_index) + .ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?; + Ok(Self { lhs: Box::new(lhs), - rhs: Box::new(rhs), - build: vec![], - lhs_row: None, - rhs_ptr: 0, - } + lhs_tuple: None, + rhs_count: 0, + rhs_index, + lhs_field: &join.lhs_field, + }) } } -impl<'a> Iterator for NestedLoopJoin<'a> { - type Item = (Tuple<'a>, Tuple<'a>); +impl<'a> Iterator for IxJoinLhs<'a> { + type Item = Tuple<'a>; fn next(&mut self) -> Option { - for t in self.rhs.as_mut() { - self.build.push(t); - } - match self.build.get(self.rhs_ptr) { - Some(v) => { - self.rhs_ptr += 1; - self.lhs_row.as_ref().map(|u| (u.clone(), v.clone())) - } - None => { - self.rhs_ptr = 1; - self.lhs_row = self.lhs.next(); - self.lhs_row - .as_ref() - .zip(self.build.first()) - .map(|(u, v)| (u.clone(), v.clone())) + match self.rhs_count { + 0 => self + .lhs + .find_map(|tuple| self.rhs_index.count(&tuple.project(self.lhs_field)).map(|n| (tuple, n))) + .map(|(tuple, n)| { + self.rhs_count = n - 1; + self.lhs_tuple = Some(tuple.clone()); + tuple + }), + _ => { + self.rhs_count -= 1; + self.lhs_tuple.clone() } } } } -/// A tuple-at-a-time filter iterator -pub struct Filter<'a> { - input: Box>, - program: ProgramEvaluator<'a>, +/// A right semijoin that does not use a unique constraint index +pub struct IxJoinRhs<'a> { + /// The lhs of the join + lhs: Box>, + /// The rhs index + rhs_index: &'a BTreeIndex, + /// The current rhs index cursor + rhs_index_cursor: Option>, + /// A handle to the datastore + rhs_table: &'a Table, + /// A handle to the blobstore + blob_store: &'a dyn BlobStore, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> IxJoinRhs<'a> { + fn build_from(join: &'a IxJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_table = tx.table(join.rhs.table_id)?; + let rhs_index = rhs_table + .get_index(join.rhs_index) + .ok_or_else(|| anyhow!("IndexId `{}` does not exist", join.rhs_index))?; + Ok(Self { + lhs: Box::new(lhs), + rhs_index, + rhs_index_cursor: None, + rhs_table, + blob_store: tx.blob_store(), + lhs_field: &join.lhs_field, + }) + } } -impl<'a> Iterator for Filter<'a> { +impl<'a> Iterator for IxJoinRhs<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + self.rhs_index_cursor + .as_mut() + .and_then(|cursor| { + cursor + .next() + .and_then(|ptr| self.rhs_table.get_row_ref(self.blob_store, ptr)) + }) + .or_else(|| { + self.lhs.find_map(|tuple| { + let mut cursor = self.rhs_index.seek(&tuple.project(self.lhs_field)); + cursor.next().and_then(|ptr| { + self.rhs_table.get_row_ref(self.blob_store, ptr).map(|ptr| { + self.rhs_index_cursor = Some(cursor); + ptr + }) + }) + }) + }) + } +} + +/// A hash join that on each probe, +/// returns at most one row from the hash table. +pub struct UniqueHashJoin<'a> { + /// The lhs relation + lhs: Box>, + /// The rhs hash table + rhs: HashMap>, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> UniqueHashJoin<'a> { + /// Builds a hash table over the rhs + fn build_from(join: &'a HashJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs = RowRefIter::build(&join.rhs, tx)?; + let rhs = rhs + .map(|ptr| { + ptr.read_col(join.rhs_field.field_pos) + .with_context(|| "Failed to decode row id") + .map(|v| (v, ptr)) + }) + .collect::>()?; + Ok(Self { + lhs: Box::new(lhs), + rhs, + lhs_field: &join.lhs_field, + }) + } +} + +impl<'a> Iterator for UniqueHashJoin<'a> { + type Item = (Tuple<'a>, RowRef<'a>); + + fn next(&mut self) -> Option { + self.lhs.find_map(|tuple| { + self.rhs + .get(&tuple.project(self.lhs_field)) + .cloned() + .map(|ptr| (tuple, ptr)) + }) + } +} + +/// A left hash semijoin that on each probe, +/// returns at most one row from the hash table. +pub struct UniqueHashJoinLhs<'a> { + /// The lhs relation + lhs: Box>, + /// The rhs hash table + rhs: HashSet, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> UniqueHashJoinLhs<'a> { + /// Builds a hash set over the rhs + fn build_from(join: &'a HashJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs = RowRefIter::build(&join.rhs, tx)?; + let rhs = rhs + .map(|ptr| { + ptr.read_col(join.rhs_field.field_pos) + .with_context(|| "Failed to decode row id") + }) + .collect::>()?; + Ok(Self { + lhs: Box::new(lhs), + rhs, + lhs_field: &join.lhs_field, + }) + } +} + +impl<'a> Iterator for UniqueHashJoinLhs<'a> { type Item = Tuple<'a>; fn next(&mut self) -> Option { - self.input - .find(|tuple| self.program.eval(tuple).as_bool().is_some_and(|ok| *ok)) + self.lhs.find(|t| self.rhs.contains(&t.project(self.lhs_field))) } } -/// An opcode for a stack-based expression evaluator -#[derive(Clone, Copy)] -pub enum OpCode { - /// == - Eq, - /// <> - Ne, - /// < - Lt, - /// > - Gt, - /// <= - Lte, - /// <= - Gte, - /// AND - And, - /// OR - Or, - /// 5 - Const(u16), - /// || - Concat(u16), - /// r.0 : [Row::Ptr] - PtrProj(u16), - /// r.0 : [Row::Ref] - RefProj(u16), - /// r.0.1 : [Row::Ptr] - TupPtrProj(u16), - /// r.0.1 : [Row::Ref] - TupRefProj(u16), +/// A right hash join that on each probe, +/// returns at most one row from the hash table. +pub struct UniqueHashJoinRhs<'a> { + /// The lhs relation + lhs: Box>, + /// The rhs hash table + rhs: HashMap>, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> UniqueHashJoinRhs<'a> { + /// Builds a hash table over the rhs + fn build_from(join: &'a HashJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs = RowRefIter::build(&join.rhs, tx)?; + let rhs = rhs + .map(|ptr| { + ptr.read_col(join.rhs_field.field_pos) + .with_context(|| "Failed to decode row id") + .map(|v| (v, ptr)) + }) + .collect::>()?; + Ok(Self { + lhs: Box::new(lhs), + rhs, + lhs_field: &join.lhs_field, + }) + } } -static_assert_size!(OpCode, 4); +impl<'a> Iterator for UniqueHashJoinRhs<'a> { + type Item = RowRef<'a>; -/// A program for evaluating a scalar expression -pub struct ExprProgram<'a> { - /// The instructions or opcodes - ops: &'a [OpCode], - /// The constants in the original expression - constants: &'a [AlgebraicValue], + fn next(&mut self) -> Option { + self.lhs.find_map(|t| self.rhs.get(&t.project(self.lhs_field)).cloned()) + } } -impl<'a> ExprProgram<'a> { - fn new(ops: &'a [OpCode], constants: &'a [AlgebraicValue]) -> Self { - Self { ops, constants } +/// A hash join that on each probe, +/// may return many rows from the hash table. +pub struct HashJoinIter<'a> { + /// The lhs relation + lhs: Box>, + /// The rhs hash table + rhs: HashMap>>, + /// The current lhs tuple + lhs_tuple: Option>, + /// The current rhs row pointer + rhs_ptr: usize, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> HashJoinIter<'a> { + /// Builds a hash table over the rhs + fn build_from(join: &'a HashJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_iter = RowRefIter::build(&join.rhs, tx)?; + let mut rhs = HashMap::new(); + for ptr in rhs_iter { + rhs.entry( + ptr.read_col(join.rhs_field.field_pos) + .with_context(|| "Failed to decode row id")?, + ) + .and_modify(|ptrs: &mut Vec<_>| ptrs.push(ptr)) + .or_insert_with(|| vec![ptr]); + } + Ok(Self { + lhs: Box::new(lhs), + rhs, + lhs_tuple: None, + rhs_ptr: 0, + lhs_field: &join.lhs_field, + }) } +} - /// Returns an interator over the opcodes - fn ops(&self) -> impl Iterator + '_ { - self.ops.iter().copied() +impl<'a> Iterator for HashJoinIter<'a> { + type Item = (Tuple<'a>, RowRef<'a>); + + fn next(&mut self) -> Option { + self.lhs_tuple + .as_ref() + .and_then(|tuple| { + self.rhs.get(&tuple.project(self.lhs_field)).and_then(|ptrs| { + let i = self.rhs_ptr; + self.rhs_ptr += 1; + ptrs.get(i).map(|ptr| (tuple.clone(), *ptr)) + }) + }) + .or_else(|| { + self.lhs.find_map(|tuple| { + self.rhs.get(&tuple.project(self.lhs_field)).and_then(|ptrs| { + self.rhs_ptr = 1; + self.lhs_tuple = Some(tuple.clone()); + ptrs.first().map(|ptr| (tuple, *ptr)) + }) + }) + }) } +} - /// Lookup a constant in the plan - fn constant(&self, i: u16) -> AlgebraicValue { - self.constants[i as usize].clone() +/// A left hash semijoin that on each probe, +/// may return many rows from the hash table. +pub struct HashJoinLhs<'a> { + /// The lhs relation + lhs: Box>, + /// The rhs hash table + rhs: HashMap, + /// The current lhs tuple + lhs_tuple: Option>, + /// The matching number of rhs rows + rhs_count: usize, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> HashJoinLhs<'a> { + /// Instantiates the iterator by building a hash table over the rhs + fn build_from(join: &'a HashJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_iter = RowRefIter::build(&join.rhs, tx)?; + let mut rhs = HashMap::new(); + for ptr in rhs_iter { + rhs.entry( + ptr.read_col(join.rhs_field.field_pos) + .with_context(|| "Failed to decode row id")?, + ) + .and_modify(|n| *n += 1) + .or_insert_with(|| 1); + } + Ok(Self { + lhs: Box::new(lhs), + rhs, + lhs_tuple: None, + rhs_count: 0, + lhs_field: &join.lhs_field, + }) } } -/// An evaluator for an [ExprProgram] -pub struct ProgramEvaluator<'a> { - program: ExprProgram<'a>, - stack: Vec, +impl<'a> Iterator for HashJoinLhs<'a> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + match self.rhs_count { + 0 => self.lhs.find_map(|tuple| { + self.rhs.get(&tuple.project(self.lhs_field)).map(|n| { + self.rhs_count = *n - 1; + self.lhs_tuple = Some(tuple.clone()); + tuple + }) + }), + _ => { + self.rhs_count -= 1; + self.lhs_tuple.clone() + } + } + } } -impl<'a> From> for ProgramEvaluator<'a> { - fn from(program: ExprProgram<'a>) -> Self { - Self { program, stack: vec![] } +/// A right hash semijoin that on each probe, +/// may return many rows from the hash table. +pub struct HashJoinRhs<'a> { + /// The lhs relation + lhs: Box>, + /// The rhs hash table + rhs: HashMap>>, + /// The current lhs tuple + lhs_value: Option, + /// The current rhs row pointer + rhs_ptr: usize, + /// The lhs probe field + lhs_field: &'a TupleField, +} + +impl<'a> HashJoinRhs<'a> { + /// Builds a hash table over the rhs + fn build_from(join: &'a HashJoin, tx: &'a FallibleDatastore<'a, T>) -> Result { + let lhs = Iter::build(&join.lhs, tx)?; + let rhs_iter = RowRefIter::build(&join.rhs, tx)?; + let mut rhs = HashMap::new(); + for ptr in rhs_iter { + rhs.entry( + ptr.read_col(join.rhs_field.field_pos) + .with_context(|| "Failed to decode row id")?, + ) + .and_modify(|ptrs: &mut Vec<_>| ptrs.push(ptr)) + .or_insert_with(|| vec![ptr]); + } + Ok(Self { + lhs: Box::new(lhs), + rhs, + lhs_value: None, + rhs_ptr: 0, + lhs_field: &join.lhs_field, + }) } } -impl ProgramEvaluator<'_> { - pub fn eval(&mut self, tuple: &Tuple) -> AlgebraicValue { - for op in self.program.ops() { - match op { - OpCode::Const(i) => { - self.stack.push(self.program.constant(i)); - } - OpCode::Eq => { - let r = self.stack.pop().unwrap(); - let l = self.stack.pop().unwrap(); - self.stack.push(AlgebraicValue::Bool(l == r)); - } - OpCode::Ne => { - let r = self.stack.pop().unwrap(); - let l = self.stack.pop().unwrap(); - self.stack.push(AlgebraicValue::Bool(l != r)); - } - OpCode::Lt => { - let r = self.stack.pop().unwrap(); - let l = self.stack.pop().unwrap(); - self.stack.push(AlgebraicValue::Bool(l < r)); - } - OpCode::Gt => { - let r = self.stack.pop().unwrap(); - let l = self.stack.pop().unwrap(); - self.stack.push(AlgebraicValue::Bool(l > r)); - } - OpCode::Lte => { - let r = self.stack.pop().unwrap(); - let l = self.stack.pop().unwrap(); - self.stack.push(AlgebraicValue::Bool(l <= r)); - } - OpCode::Gte => { - let r = self.stack.pop().unwrap(); - let l = self.stack.pop().unwrap(); - self.stack.push(AlgebraicValue::Bool(l >= r)); - } - OpCode::And => { - let r = *self.stack.pop().unwrap().as_bool().unwrap(); - let l = *self.stack.pop().unwrap().as_bool().unwrap(); - self.stack.push(AlgebraicValue::Bool(l && r)); - } - OpCode::Or => { - let r = *self.stack.pop().unwrap().as_bool().unwrap(); - let l = *self.stack.pop().unwrap().as_bool().unwrap(); - self.stack.push(AlgebraicValue::Bool(l || r)); - } - OpCode::Concat(n) => { - let mut elems = Vec::with_capacity(n as usize); - // Pop args off stack - for _ in 0..n { - elems.push(self.stack.pop().unwrap()); - } - // Concat and push on stack - self.stack.push(AlgebraicValue::Product(ProductValue::from_iter( - elems.into_iter().rev(), - ))); - } - OpCode::PtrProj(i) => { - self.stack.push( - tuple - // Read field from row ref - .expect_row() - .expect_ptr() - .read_col(i as usize) - .unwrap(), - ); - } - OpCode::RefProj(i) => { - self.stack.push( - tuple - // Read field from product ref - .expect_row() - .expect_ref() - .elements[i as usize] - .clone(), - ); - } - OpCode::TupPtrProj(i) => { - let idx = *self - // Pop index off stack - .stack - .pop() - .unwrap() - .as_u16() - .unwrap(); - self.stack.push( - tuple - // Read field from row ref - .expect_join()[idx as usize] - .expect_ptr() - .read_col(i as usize) - .unwrap(), - ); - } - OpCode::TupRefProj(i) => { - let idx = *self - // Pop index off stack - .stack - .pop() - .unwrap() - .as_u16() - .unwrap(); - self.stack.push( - tuple - // Read field from product ref - .expect_join()[idx as usize] - .expect_ptr() - .read_col(i as usize) - .unwrap(), - ); - } +impl<'a> Iterator for HashJoinRhs<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + self.lhs_value + .as_ref() + .and_then(|value| { + self.rhs.get(value).and_then(|ptrs| { + let i = self.rhs_ptr; + self.rhs_ptr += 1; + ptrs.get(i).copied() + }) + }) + .or_else(|| { + self.lhs.find_map(|tuple| { + let value = tuple.project(self.lhs_field); + self.rhs.get(&value).and_then(|ptrs| { + self.rhs_ptr = 1; + self.lhs_value = Some(value.clone()); + ptrs.first().copied() + }) + }) + }) + } +} + +/// A nested loop join iterator +pub struct NLJoin<'a> { + /// The lhs input + lhs: Box>, + /// The materialized rhs + rhs: Vec>, + /// The current lhs tuple + lhs_tuple: Option>, + /// The current rhs row pointer + rhs_ptr: usize, +} + +impl<'a> NLJoin<'a> { + /// Instantiates the iterator by materializing the rhs + fn build_from( + lhs: &'a PhysicalPlan, + rhs: &'a PhysicalPlan, + tx: &'a FallibleDatastore<'a, T>, + ) -> Result { + let lhs = Iter::build(lhs, tx)?; + let rhs = RowRefIter::build(rhs, tx)?; + Ok(Self { + lhs: Box::new(lhs), + rhs: rhs.collect(), + lhs_tuple: None, + rhs_ptr: 0, + }) + } +} + +impl<'a> Iterator for NLJoin<'a> { + type Item = (Tuple<'a>, RowRef<'a>); + + fn next(&mut self) -> Option { + match self.rhs.get(self.rhs_ptr) { + Some(v) => { + self.rhs_ptr += 1; + self.lhs_tuple.as_ref().map(|u| (u.clone(), *v)) + } + None => { + self.rhs_ptr = 1; + self.lhs_tuple = self.lhs.next(); + self.lhs_tuple + .as_ref() + .zip(self.rhs.first()) + .map(|(u, v)| (u.clone(), *v)) } } - self.stack.pop().unwrap() + } +} + +/// A tuple-at-a-time filter iterator +pub struct Filter<'a, I> { + input: Box, + expr: &'a PhysicalExpr, +} + +impl<'a> Iterator for Filter<'a, RowRefIter<'a>> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + self.input.find(|ptr| self.expr.eval_bool(ptr)) + } +} + +impl<'a> Iterator for Filter<'a, Iter<'a>> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + self.input.find(|tuple| self.expr.eval_bool(tuple)) } } diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 9708a97a673..4bb9ac88a17 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -1 +1,116 @@ +use std::ops::{Deref, RangeBounds}; + +use anyhow::{anyhow, Result}; +use iter::PlanIter; +use spacetimedb_lib::AlgebraicValue; +use spacetimedb_physical_plan::plan::{ProjectField, ProjectPlan, TupleField}; +use spacetimedb_primitives::{IndexId, TableId}; +use spacetimedb_table::{ + blob_store::BlobStore, + static_assert_size, + table::{IndexScanIter, RowRef, Table, TableScanIter}, +}; + pub mod iter; + +/// The datastore interface required for building an executor +pub trait Datastore { + fn table(&self, table_id: TableId) -> Option<&Table>; + fn blob_store(&self) -> &dyn BlobStore; +} + +/// A wrapper around a [Datastore] that returns an error instead of `None` +pub(crate) struct FallibleDatastore<'a, T>(pub &'a T); + +impl Deref for FallibleDatastore<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl<'a, T: Datastore> FallibleDatastore<'a, T> { + fn table(&self, table_id: TableId) -> Result<&Table> { + self.0 + .table(table_id) + .ok_or_else(|| anyhow!("TableId `{table_id}` does not exist")) + } + + fn table_scan(&self, table_id: TableId) -> Result { + self.0 + .table(table_id) + .map(|table| table.scan_rows(self.0.blob_store())) + .ok_or_else(|| anyhow!("TableId `{table_id}` does not exist")) + } + + fn index_scan( + &self, + table_id: TableId, + index_id: IndexId, + range: &impl RangeBounds, + ) -> Result { + self.0 + .table(table_id) + .ok_or_else(|| anyhow!("TableId `{table_id}` does not exist")) + .and_then(|table| { + table + .index_seek_by_id(self.0.blob_store(), index_id, range) + .ok_or_else(|| anyhow!("IndexId `{index_id}` does not exist")) + }) + } +} + +/// Each query operator returns a tuple of [RowRef]s +#[derive(Clone)] +pub enum Tuple<'a> { + /// A pointer to a row in a base table + Row(RowRef<'a>), + /// A temporary returned by a join operator + Join(Vec>), +} + +static_assert_size!(Tuple, 32); + +impl ProjectField for Tuple<'_> { + fn project(&self, field: &TupleField) -> AlgebraicValue { + match self { + Self::Row(ptr) => ptr.read_col(field.field_pos).unwrap(), + Self::Join(ptrs) => field + .label_pos + .and_then(|i| ptrs.get(i)) + .map(|ptr| ptr.read_col(field.field_pos).unwrap()) + .unwrap(), + } + } +} + +impl<'a> Tuple<'a> { + /// Select the tuple element at position `i` + fn select(self, i: usize) -> Option> { + match self { + Self::Row(_) => None, + Self::Join(mut ptrs) => Some(ptrs.swap_remove(i)), + } + } + + /// Append a [RowRef] to a tuple + fn append(self, ptr: RowRef<'a>) -> Self { + match self { + Self::Row(row) => Self::Join(vec![row, ptr]), + Self::Join(mut rows) => { + rows.push(ptr); + Self::Join(rows) + } + } + } +} + +/// Execute a query plan. +/// The actual execution is driven by `f`. +pub fn execute_plan(plan: &ProjectPlan, tx: &T, f: impl Fn(PlanIter) -> R) -> Result +where + T: Datastore, +{ + PlanIter::build(plan, &FallibleDatastore(tx)).map(f) +} diff --git a/crates/expr/src/check.rs b/crates/expr/src/check.rs index b0fbb04ddca..c14963ee81b 100644 --- a/crates/expr/src/check.rs +++ b/crates/expr/src/check.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use crate::expr::{Expr, Project}; +use crate::expr::{Expr, ProjectList, ProjectName}; use crate::{expr::LeftDeepJoin, statement::Statement}; use spacetimedb_lib::AlgebraicType; use spacetimedb_schema::schema::TableSchema; @@ -46,9 +46,9 @@ pub trait TypeChecker { type Ast; type Set; - fn type_ast(ast: Self::Ast, tx: &impl SchemaView) -> TypingResult; + fn type_ast(ast: Self::Ast, tx: &impl SchemaView) -> TypingResult; - fn type_set(ast: Self::Set, vars: &mut Relvars, tx: &impl SchemaView) -> TypingResult; + fn type_set(ast: Self::Set, vars: &mut Relvars, tx: &impl SchemaView) -> TypingResult; fn type_from(from: SqlFrom, vars: &mut Relvars, tx: &impl SchemaView) -> TypingResult { match from { @@ -111,11 +111,11 @@ impl TypeChecker for SubChecker { type Ast = SqlSelect; type Set = SqlSelect; - fn type_ast(ast: Self::Ast, tx: &impl SchemaView) -> TypingResult { + fn type_ast(ast: Self::Ast, tx: &impl SchemaView) -> TypingResult { Self::type_set(ast, &mut Relvars::default(), tx) } - fn type_set(ast: Self::Set, vars: &mut Relvars, tx: &impl SchemaView) -> TypingResult { + fn type_set(ast: Self::Set, vars: &mut Relvars, tx: &impl SchemaView) -> TypingResult { match ast { SqlSelect { project, @@ -138,26 +138,30 @@ impl TypeChecker for SubChecker { } /// Parse and type check a subscription query -pub fn parse_and_type_sub(sql: &str, tx: &impl SchemaView) -> TypingResult { +pub fn parse_and_type_sub(sql: &str, tx: &impl SchemaView) -> TypingResult { expect_table_type(SubChecker::type_ast(parse_subscription(sql)?, tx)?) } +/// Type check a subscription query +pub fn type_subscription(ast: SqlSelect, tx: &impl SchemaView) -> TypingResult { + expect_table_type(SubChecker::type_ast(ast, tx)?) +} + /// Parse and type check a *subscription* query into a `StatementCtx` pub fn compile_sql_sub<'a>(sql: &'a str, tx: &impl SchemaView) -> TypingResult> { - let expr = parse_and_type_sub(sql, tx)?; Ok(StatementCtx { - statement: Statement::Select(expr), + statement: Statement::Select(ProjectList::Name(parse_and_type_sub(sql, tx)?)), sql, source: StatementSource::Subscription, }) } /// Returns an error if the input type is not a table type or relvar -fn expect_table_type(expr: Project) -> TypingResult { - if let Project::Fields(..) = expr { - return Err(Unsupported::ReturnType.into()); +fn expect_table_type(expr: ProjectList) -> TypingResult { + match expr { + ProjectList::Name(proj) => Ok(proj), + ProjectList::List(..) => Err(Unsupported::ReturnType.into()), } - Ok(expr) } pub mod test_utils { diff --git a/crates/expr/src/expr.rs b/crates/expr/src/expr.rs index 3283d6cd1c2..18ad73fcc89 100644 --- a/crates/expr/src/expr.rs +++ b/crates/expr/src/expr.rs @@ -5,21 +5,62 @@ use spacetimedb_primitives::TableId; use spacetimedb_schema::schema::TableSchema; use spacetimedb_sql_parser::ast::{BinOp, LogOp}; -/// A projection is the root of any relation expression +/// A projection is the root of any relational expression. +/// This type represents a projection that returns relvars. +/// +/// For example: +/// +/// ```sql +/// select * from t +/// ``` +/// +/// and +/// +/// ```sql +/// select t.* from t join s ... +/// ``` #[derive(Debug)] -pub enum Project { +pub enum ProjectName { None(RelExpr), - Relvar(RelExpr, Box), - Fields(RelExpr, Vec<(Box, FieldProject)>), + Some(RelExpr, Box), } -impl Project { +impl ProjectName { /// What is the [TableId] for this projection? pub fn table_id(&self) -> Option { match self { - Self::Fields(..) => None, - Self::Relvar(input, var) => input.table_id(Some(var.as_ref())), Self::None(input) => input.table_id(None), + Self::Some(input, var) => input.table_id(Some(var.as_ref())), + } + } +} + +/// A projection is the root of any relational expression. +/// This type represents a projection that returns fields. +/// +/// For example: +/// +/// ```sql +/// select a, b from t +/// ``` +/// +/// and +/// +/// ```sql +/// select t.a as x from t join s ... +/// ``` +#[derive(Debug)] +pub enum ProjectList { + Name(ProjectName), + List(RelExpr, Vec<(Box, FieldProject)>), +} + +impl ProjectList { + /// What is the [TableId] for this projection? + pub fn table_id(&self) -> Option { + match self { + Self::List(..) => None, + Self::Name(proj) => proj.table_id(), } } } diff --git a/crates/expr/src/lib.rs b/crates/expr/src/lib.rs index a763780e28a..fa13b349bf1 100644 --- a/crates/expr/src/lib.rs +++ b/crates/expr/src/lib.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use crate::statement::Statement; use check::{Relvars, TypingResult}; use errors::{DuplicateName, InvalidLiteral, InvalidOp, InvalidWildcard, UnexpectedType, Unresolved}; -use expr::{Expr, FieldProject, Project, RelExpr}; +use expr::{Expr, FieldProject, ProjectList, ProjectName, RelExpr}; use spacetimedb_lib::{from_hex_pad, Address, AlgebraicType, AlgebraicValue, Identity}; use spacetimedb_schema::schema::ColumnSchema; use spacetimedb_sql_parser::ast::{self, BinOp, ProjectElem, SqlExpr, SqlIdent, SqlLiteral}; @@ -22,11 +22,13 @@ pub(crate) fn type_select(input: RelExpr, expr: SqlExpr, vars: &Relvars) -> Typi } /// Type check and lower a [ast::Project] -pub(crate) fn type_proj(input: RelExpr, proj: ast::Project, vars: &Relvars) -> TypingResult { +pub(crate) fn type_proj(input: RelExpr, proj: ast::Project, vars: &Relvars) -> TypingResult { match proj { ast::Project::Star(None) if input.nfields() > 1 => Err(InvalidWildcard::Join.into()), - ast::Project::Star(None) => Ok(Project::None(input)), - ast::Project::Star(Some(SqlIdent(var))) if input.has_field(&var) => Ok(Project::Relvar(input, var)), + ast::Project::Star(None) => Ok(ProjectList::Name(ProjectName::None(input))), + ast::Project::Star(Some(SqlIdent(var))) if input.has_field(&var) => { + Ok(ProjectList::Name(ProjectName::Some(input, var))) + } ast::Project::Star(Some(SqlIdent(var))) => Err(Unresolved::var(&var).into()), ast::Project::Exprs(elems) => { let mut projections = vec![]; @@ -42,7 +44,7 @@ pub(crate) fn type_proj(input: RelExpr, proj: ast::Project, vars: &Relvars) -> T } } - Ok(Project::Fields(input, projections)) + Ok(ProjectList::List(input, projections)) } } } diff --git a/crates/expr/src/statement.rs b/crates/expr/src/statement.rs index 4fac99cbaa5..548e8682ea2 100644 --- a/crates/expr/src/statement.rs +++ b/crates/expr/src/statement.rs @@ -12,7 +12,7 @@ use spacetimedb_sql_parser::{ }; use thiserror::Error; -use crate::{check::Relvars, expr::Project}; +use crate::{check::Relvars, expr::ProjectList}; use super::{ check::{SchemaView, TypeChecker, TypingResult}, @@ -22,7 +22,7 @@ use super::{ }; pub enum Statement { - Select(Project), + Select(ProjectList), Insert(TableInsert), Update(TableUpdate), Delete(TableDelete), @@ -240,11 +240,11 @@ impl TypeChecker for SqlChecker { type Ast = SqlSelect; type Set = SqlSelect; - fn type_ast(ast: Self::Ast, tx: &impl SchemaView) -> TypingResult { + fn type_ast(ast: Self::Ast, tx: &impl SchemaView) -> TypingResult { Self::type_set(ast, &mut Relvars::default(), tx) } - fn type_set(ast: Self::Set, vars: &mut Relvars, tx: &impl SchemaView) -> TypingResult { + fn type_set(ast: Self::Set, vars: &mut Relvars, tx: &impl SchemaView) -> TypingResult { match ast { SqlSelect { project, diff --git a/crates/physical-plan/Cargo.toml b/crates/physical-plan/Cargo.toml index 247d06a11ca..fb9f62a4a98 100644 --- a/crates/physical-plan/Cargo.toml +++ b/crates/physical-plan/Cargo.toml @@ -7,9 +7,14 @@ license-file = "LICENSE" description = "The physical query plan for the SpacetimeDB query engine" [dependencies] +anyhow.workspace = true derive_more.workspace = true spacetimedb-lib.workspace = true spacetimedb-primitives.workspace = true spacetimedb-schema.workspace = true spacetimedb-expr.workspace = true spacetimedb-sql-parser.workspace = true +spacetimedb-table.workspace = true + +[dev-dependencies] +pretty_assertions.workspace = true diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index 160ada0ed8c..22bf6ff08bd 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -2,8 +2,11 @@ use std::collections::HashMap; -use crate::plan::{HashJoin, Label, PhysicalCtx, PhysicalExpr, PhysicalPlan, PhysicalProject, ProjectField, Semi}; -use spacetimedb_expr::expr::{Expr, FieldProject, LeftDeepJoin, Project, RelExpr}; +use crate::plan::{ + HashJoin, Label, PhysicalCtx, PhysicalExpr, PhysicalPlan, ProjectListPlan, ProjectPlan, Semi, TupleField, +}; + +use spacetimedb_expr::expr::{Expr, FieldProject, LeftDeepJoin, ProjectList, ProjectName, RelExpr}; use spacetimedb_expr::statement::Statement; use spacetimedb_expr::StatementCtx; @@ -24,13 +27,12 @@ fn compile_expr(expr: Expr, var: &mut impl VarLabel) -> PhysicalExpr { } } -fn compile_project(var: &mut impl VarLabel, expr: Project) -> PhysicalProject { +fn compile_project_list(var: &mut impl VarLabel, expr: ProjectList) -> ProjectListPlan { match expr { - Project::None(input) => PhysicalProject::None(compile_rel_expr(var, input)), - Project::Relvar(input, name) => PhysicalProject::Relvar(compile_rel_expr(var, input), var.label(&name)), - Project::Fields(input, exprs) => PhysicalProject::Fields( - compile_rel_expr(var, input), - exprs + ProjectList::Name(proj) => ProjectListPlan::Name(compile_project_name(var, proj)), + ProjectList::List(proj, fields) => ProjectListPlan::List( + compile_rel_expr(var, proj), + fields .into_iter() .map(|(alias, expr)| (alias, compile_field_project(var, expr))) .collect(), @@ -38,10 +40,18 @@ fn compile_project(var: &mut impl VarLabel, expr: Project) -> PhysicalProject { } } -fn compile_field_project(var: &mut impl VarLabel, expr: FieldProject) -> ProjectField { - ProjectField { - var: var.label(&expr.table), - pos: expr.field, +fn compile_project_name(var: &mut impl VarLabel, proj: ProjectName) -> ProjectPlan { + match proj { + ProjectName::None(input) => ProjectPlan::None(compile_rel_expr(var, input)), + ProjectName::Some(input, name) => ProjectPlan::Name(compile_rel_expr(var, input), var.label(&name), None), + } +} + +fn compile_field_project(var: &mut impl VarLabel, expr: FieldProject) -> TupleField { + TupleField { + label: var.label(&expr.table), + label_pos: None, + field_pos: expr.field, } } @@ -61,13 +71,15 @@ fn compile_rel_expr(var: &mut impl VarLabel, ast: RelExpr) -> PhysicalPlan { HashJoin { lhs: Box::new(compile_rel_expr(var, *join.lhs)), rhs: Box::new(PhysicalPlan::TableScan(join.rhs, var.label(&join.var))), - lhs_field: ProjectField { - var: var.label(u.as_ref()), - pos: a, + lhs_field: TupleField { + label: var.label(u.as_ref()), + label_pos: None, + field_pos: a, }, - rhs_field: ProjectField { - var: var.label(v.as_ref()), - pos: b, + rhs_field: TupleField { + label: var.label(v.as_ref()), + label_pos: None, + field_pos: b, }, unique: false, }, @@ -88,6 +100,31 @@ fn compile_rel_expr(var: &mut impl VarLabel, ast: RelExpr) -> PhysicalPlan { } } +/// Compile a logical subscribe expression +pub fn compile_sub(project: ProjectName) -> ProjectPlan { + struct Interner { + next: usize, + names: HashMap, + } + impl VarLabel for Interner { + fn label(&mut self, name: &str) -> Label { + if let Some(id) = self.names.get(name) { + return Label(*id); + } + self.next += 1; + self.names.insert(name.to_owned(), self.next); + self.next.into() + } + } + compile_project_name( + &mut Interner { + next: 0, + names: HashMap::new(), + }, + project, + ) +} + /// Compile a SQL statement into a physical plan. /// /// The input [Statement] is assumed to be valid so the lowering is not expected to fail. @@ -109,7 +146,7 @@ pub fn compile(ast: StatementCtx<'_>) -> PhysicalCtx<'_> { } } let plan = match ast.statement { - Statement::Select(expr) => compile_project( + Statement::Select(expr) => compile_project_list( &mut Interner { next: 0, names: HashMap::new(), diff --git a/crates/physical-plan/src/lib.rs b/crates/physical-plan/src/lib.rs index b79989e66e4..b1ea3c965a4 100644 --- a/crates/physical-plan/src/lib.rs +++ b/crates/physical-plan/src/lib.rs @@ -1,2 +1,3 @@ pub mod compile; pub mod plan; +pub mod rules; diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 99e10b1e914..1e1af8f9613 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -1,40 +1,121 @@ -use std::{ops::Bound, sync::Arc}; +use std::{borrow::Cow, ops::Bound, sync::Arc}; use derive_more::From; use spacetimedb_expr::StatementSource; use spacetimedb_lib::AlgebraicValue; use spacetimedb_primitives::{ColId, ColSet, IndexId}; -use spacetimedb_schema::schema::{IndexSchema, TableSchema}; +use spacetimedb_schema::schema::TableSchema; use spacetimedb_sql_parser::ast::{BinOp, LogOp}; +use spacetimedb_table::table::RowRef; + +use crate::rules::{ + ComputePositions, ConjunctionToIxScan, EqToIxScan, HashToIxJoin, PushConjunction, PushEqFilter, RewriteRule, + UniqueHashJoinRule, UniqueIxJoinRule, +}; /// Table aliases are replaced with labels in the physical plan #[derive(Debug, Clone, Copy, PartialEq, Eq, From)] pub struct Label(pub usize); -/// Physical query plans always terminate with a projection -#[derive(Debug, PartialEq, Eq)] -pub enum PhysicalProject { +/// Physical plans always terminate with a projection. +/// This type of projection returns row ids. +/// +/// It can represent: +/// +/// ```sql +/// select * from t +/// ``` +/// +/// and +/// +/// ```sql +/// select t.* from t join ... +/// ``` +/// +/// but not +/// +/// ```sql +/// select a from t +/// ``` +#[derive(Debug)] +pub enum ProjectPlan { None(PhysicalPlan), - Relvar(PhysicalPlan, Label), - Fields(PhysicalPlan, Vec<(Box, ProjectField)>), + Name(PhysicalPlan, Label, Option), } -impl PhysicalProject { +impl ProjectPlan { pub fn optimize(self) -> Self { match self { Self::None(plan) => Self::None(plan.optimize(vec![])), - Self::Relvar(plan, var) => Self::None(plan.optimize(vec![var])), - Self::Fields(plan, fields) => { - Self::Fields(plan.optimize(fields.iter().map(|(_, proj)| proj.var).collect()), fields) + Self::Name(plan, label, _) => { + let plan = plan.optimize(vec![label]); + let n = plan.nfields(); + let pos = plan.label_pos(&label); + match n { + 1 => Self::None(plan), + _ => Self::Name(plan, label, pos), + } } } } } +/// Physical plans always terminate with a projection. +/// This type can project fields within a table. +/// +/// That is, it can represent: +/// +/// ```sql +/// select a from t +/// ``` +/// +/// as well as +/// +/// ```sql +/// select t.a, s.b from t join s ... +/// ``` +#[derive(Debug)] +pub enum ProjectListPlan { + Name(ProjectPlan), + List(PhysicalPlan, Vec<(Box, TupleField)>), +} + +impl ProjectListPlan { + pub fn optimize(self) -> Self { + match self { + Self::Name(plan) => Self::Name(plan.optimize()), + Self::List(plan, fields) => Self::List( + plan.optimize( + fields + .iter() + .map(|(_, TupleField { label, .. })| label) + .copied() + .collect(), + ), + fields, + ), + } + } +} + +/// Query operators return tuples of rows. +/// And this type refers to a field of a row within a tuple. +/// +/// Note that from the perspective of the optimizer, +/// tuple elements have names or labels, +/// so as to preserve query semantics across rewrites. +/// +/// However from the perspective of the query engine, +/// tuple elements are entirely positional. +/// Hence the need for both `label` and `label_pos`. +/// +/// The former is consistent across rewrites. +/// The latter is only computed once after optimization. #[derive(Debug, PartialEq, Eq)] -pub struct ProjectField { - pub var: Label, - pub pos: usize, +pub struct TupleField { + pub label: Label, + pub label_pos: Option, + pub field_pos: usize, } /// A physical plan represents a concrete evaluation strategy. @@ -70,6 +151,21 @@ impl PhysicalPlan { } } + /// Walks the plan tree and calls `f` on every op + pub fn visit_mut(&mut self, f: &mut impl FnMut(&mut Self)) { + f(self); + match self { + Self::IxJoin(IxJoin { lhs: input, .. }, _) | Self::Filter(input, _) => { + input.visit_mut(f); + } + Self::NLJoin(lhs, rhs) | Self::HashJoin(HashJoin { lhs, rhs, .. }, _) => { + lhs.visit_mut(f); + rhs.visit_mut(f); + } + _ => {} + } + } + /// Is there any subplan where `f` returns true? pub fn any(&self, f: &impl Fn(&Self) -> bool) -> bool { let mut ok = false; @@ -188,12 +284,13 @@ impl PhysicalPlan { plan } - /// Optimize a physical plan by applying rewrite rules. + /// Optimize a plan using the following rewrites: /// - /// First we canonicalize the plan. - /// Next we push filters to the leaves. - /// Then we try to turn those filters into index scans. - /// And finally we deterimine the index joins and semijoins. + /// 1. Canonicalize the plan + /// 2. Push filters to the leaves + /// 3. Turn filters into index scans if possible + /// 4. Determine index and semijoins + /// 5. Compute positions for tuple labels pub fn optimize(self, reqs: Vec