Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support intra-operator parallelism #856

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
10d7faa
stash
wangrunji0408 Apr 20, 2024
f608270
basic support for converting to distributed plan
wangrunji0408 Apr 20, 2024
344a5f8
rename `distributed` to `parallel`
wangrunji0408 Apr 21, 2024
d51e7c4
hash partition executor
wangrunji0408 Nov 10, 2024
5a25a2e
fix
wangrunji0408 Nov 11, 2024
c8f0b68
Merge remote-tracking branch 'origin/main' into wrj/mpp
wangrunji0408 Nov 11, 2024
8fe3343
fix metrics and improve debug info
wangrunji0408 Nov 11, 2024
cdea038
add a pragma to control parallel plan
wangrunji0408 Nov 17, 2024
fe2ee6c
two-phase aggregation
wangrunji0408 Nov 21, 2024
67bf63b
update rust toolchain and dependencies
wangrunji0408 Nov 23, 2024
be64142
upgrade dependencies
wangrunji0408 Nov 23, 2024
bff93fe
fix warnings
wangrunji0408 Nov 23, 2024
68f0ec3
support keyword completion
wangrunji0408 Nov 23, 2024
9710023
support cursor in completed line
wangrunji0408 Nov 23, 2024
c77c0b0
fix clippy
wangrunji0408 Nov 23, 2024
0f6f712
Merge branch 'wrj/update-toolchain' into wrj/partition
wangrunji0408 Nov 23, 2024
c90782b
Merge branch 'wrj/completion' into wrj/partition
wangrunji0408 Nov 23, 2024
685b148
fix to_parallel for left outer join and DDL statements
wangrunji0408 Nov 23, 2024
322d8f1
fix hash exchange
wangrunji0408 Nov 23, 2024
0961aa1
replace pragma `enable_parallel_execution` by set variable `parallelism`
wangrunji0408 Nov 23, 2024
0588dff
fix 2-phase count agg
wangrunji0408 Nov 23, 2024
db3a019
enable partitioning in unit test. fix bugs
wangrunji0408 Nov 23, 2024
7f56f92
fix DDL to parallel
wangrunji0408 Nov 23, 2024
87a7fb9
add unit test for Expr size
wangrunji0408 Nov 23, 2024
fac57a1
Merge remote-tracking branch 'origin/main' into wrj/partition
wangrunji0408 Nov 24, 2024
85d131b
fix timing
wangrunji0408 Nov 24, 2024
4ff9450
add counted instrument
wangrunji0408 Nov 24, 2024
a47bc49
correctly show the time of exchange operator
wangrunji0408 Nov 24, 2024
cea7429
use ahash to optimize hash
wangrunji0408 Nov 24, 2024
35c56fd
decouple rows and time of exchange operator
wangrunji0408 Nov 24, 2024
4a2d2ad
do not eliminate duplicate exchange
wangrunji0408 Nov 24, 2024
321e330
fix clippy
wangrunji0408 Nov 24, 2024
1bc5611
fix unit test
wangrunji0408 Nov 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pin-project = "1"
pretty-xmlish = "0.1"
prost = "0.13"
pyo3 = { version = "0.22", features = ["extension-module"], optional = true }
rand = "0.8"
ref-cast = "1.0"
regex = "1"
risinglight_proto = "0.2"
Expand Down
5 changes: 5 additions & 0 deletions src/array/internal_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ use crate::for_all_variants;
pub trait ArrayValidExt: Array {
fn get_valid_bitmap(&self) -> &BitVec;
fn get_valid_bitmap_mut(&mut self) -> &mut BitVec;

/// Returns the number of null values in this array.
fn null_count(&self) -> usize {
self.get_valid_bitmap().count_zeros()
}
}

pub trait ArrayImplValidExt {
Expand Down
69 changes: 63 additions & 6 deletions src/array/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Array operations.

use std::borrow::Borrow;
use std::hash::{Hash, Hasher};

use num_traits::ToPrimitive;
use regex::Regex;
Expand Down Expand Up @@ -204,6 +205,26 @@ impl ArrayImpl {
Ok(A::new_bool(clear_null(unary_op(a.as_ref(), |b| !b))))
}

/// Hash the array into the given hasher.
pub fn hash_to(&self, hasher: &mut [impl Hasher]) {
assert_eq!(hasher.len(), self.len());
match self {
A::Null(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Bool(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Int16(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Int32(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Int64(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Float64(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Decimal(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::String(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Date(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Timestamp(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::TimestampTz(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Interval(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
A::Blob(a) => a.iter().zip(hasher).for_each(|(v, h)| v.hash(h)),
}
}

pub fn like(&self, pattern: &str) -> Result {
/// Converts a SQL LIKE pattern to a regex pattern.
fn like_to_regex(pattern: &str) -> String {
Expand Down Expand Up @@ -600,12 +621,48 @@ impl ArrayImpl {
/// Returns the sum of values.
pub fn sum(&self) -> DataValue {
match self {
Self::Int16(a) => DataValue::Int16(a.raw_iter().sum()),
Self::Int32(a) => DataValue::Int32(a.raw_iter().sum()),
Self::Int64(a) => DataValue::Int64(a.raw_iter().sum()),
Self::Float64(a) => DataValue::Float64(a.raw_iter().sum()),
Self::Decimal(a) => DataValue::Decimal(a.raw_iter().sum()),
Self::Interval(a) => DataValue::Interval(a.raw_iter().sum()),
Self::Int16(a) => {
if a.null_count() == a.len() {
DataValue::Null
} else {
DataValue::Int16(a.raw_iter().sum())
}
}
Self::Int32(a) => {
if a.null_count() == a.len() {
DataValue::Null
} else {
DataValue::Int32(a.raw_iter().sum())
}
}
Self::Int64(a) => {
if a.null_count() == a.len() {
DataValue::Null
} else {
DataValue::Int64(a.raw_iter().sum())
}
}
Self::Float64(a) => {
if a.null_count() == a.len() {
DataValue::Null
} else {
DataValue::Float64(a.raw_iter().sum())
}
}
Self::Decimal(a) => {
if a.null_count() == a.len() {
DataValue::Null
} else {
DataValue::Decimal(a.raw_iter().sum())
}
}
Self::Interval(a) => {
if a.null_count() == a.len() {
DataValue::Null
} else {
DataValue::Interval(a.raw_iter().sum())
}
}
_ => panic!("can not sum array"),
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/binder/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use super::*;

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
pub struct CreateFunction {
pub struct FunctionDef {
pub schema_name: String,
pub name: String,
pub arg_types: Vec<crate::types::DataType>,
Expand All @@ -20,22 +20,22 @@ pub struct CreateFunction {
pub body: String,
}

impl fmt::Display for CreateFunction {
impl fmt::Display for Box<FunctionDef> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let explainer = Pretty::childless_record("CreateFunction", self.pretty_function());
let explainer = Pretty::childless_record("FunctionDef", self.pretty_function());
delegate_fmt(&explainer, f, String::with_capacity(1000))
}
}

impl FromStr for CreateFunction {
impl FromStr for Box<FunctionDef> {
type Err = ();

fn from_str(_s: &str) -> std::result::Result<Self, Self::Err> {
Err(())
}
}

impl CreateFunction {
impl FunctionDef {
pub fn pretty_function<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> {
vec![
("name", Pretty::display(&self.name)),
Expand Down Expand Up @@ -102,16 +102,16 @@ impl Binder {
arg_names.push(arg.name.map_or("".to_string(), |n| n.to_string()));
}

let f = self.egraph.add(Node::CreateFunction(CreateFunction {
let func_def = self.egraph.add(Node::FunctionDef(Box::new(FunctionDef {
schema_name,
name,
arg_types,
arg_names,
return_type,
language,
body,
}));

Ok(f)
})));
let id = self.egraph.add(Node::CreateFunction(func_def));
Ok(id)
}
}
17 changes: 9 additions & 8 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ use super::*;
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnId, SchemaId};

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
pub struct CreateTable {
pub struct TableDef {
pub schema_id: SchemaId,
pub table_name: String,
pub columns: Vec<ColumnCatalog>,
pub ordered_pk_ids: Vec<ColumnId>,
}

impl fmt::Display for CreateTable {
impl fmt::Display for TableDef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let explainer = Pretty::childless_record("CreateTable", self.pretty_table());
let explainer = Pretty::childless_record("TableDef", self.pretty_table());
delegate_fmt(&explainer, f, String::with_capacity(1000))
}
}

impl CreateTable {
pub fn pretty_table<'a>(&self) -> Vec<(&'a str, Pretty<'a>)> {
impl TableDef {
pub fn pretty_table(&self) -> Vec<(&str, Pretty)> {
let cols = Pretty::Array(self.columns.iter().map(|c| c.desc().pretty()).collect());
let ids = Pretty::Array(self.ordered_pk_ids.iter().map(Pretty::display).collect());
vec![
Expand All @@ -39,7 +39,7 @@ impl CreateTable {
}
}

impl FromStr for Box<CreateTable> {
impl FromStr for Box<TableDef> {
type Err = ();

fn from_str(_s: &str) -> std::result::Result<Self, Self::Err> {
Expand Down Expand Up @@ -119,13 +119,14 @@ impl Binder {
columns[index as usize].set_nullable(false);
}

let create = self.egraph.add(Node::CreateTable(Box::new(CreateTable {
let table_def = self.egraph.add(Node::TableDef(Box::new(TableDef {
schema_id: schema.id(),
table_name: table_name.into(),
columns,
ordered_pk_ids,
})));
Ok(create)
let id = self.egraph.add(Node::CreateTable(table_def));
Ok(id)
}

/// get primary keys' id in declared order。
Expand Down
4 changes: 2 additions & 2 deletions src/binder/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ impl Binder {
})
.collect();

let table = self.egraph.add(Node::CreateTable(Box::new(CreateTable {
let table_def = self.egraph.add(Node::TableDef(Box::new(TableDef {
schema_id: schema.id(),
table_name: table_name.into(),
columns,
ordered_pk_ids: vec![],
})));
let create_view = self.egraph.add(Node::CreateView([table, query]));
let create_view = self.egraph.add(Node::CreateView([table_def, query]));
Ok(create_view)
}
}
2 changes: 1 addition & 1 deletion src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl Binder {

fn bind_extract(&mut self, field: DateTimeField, expr: Expr) -> Result {
let expr = self.bind_expr(expr)?;
let field = self.egraph.add(Node::Field(field.into()));
let field = self.egraph.add(Node::Field(Box::new(field.into())));
Ok(self.egraph.add(Node::Extract([field, expr])))
}

Expand Down
51 changes: 40 additions & 11 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ pub struct Database {
/// The configuration of the database.
#[derive(Debug, Default)]
struct Config {
/// If true, no optimization will be applied to the query.
disable_optimizer: bool,
mock_stat: Option<Statistics>,
/// The number of partitions of each operator.
/// If set to 0, it will be automatically determined by the number of worker threads.
parallelism: usize,
}

impl Database {
Expand Down Expand Up @@ -93,6 +97,11 @@ impl Database {
crate::planner::Config {
enable_range_filter_scan: self.storage.support_range_filter_scan(),
table_is_sorted_by_primary_key: self.storage.table_is_sorted_by_primary_key(),
parallelism: if self.config.lock().unwrap().parallelism > 0 {
self.config.lock().unwrap().parallelism
} else {
tokio::runtime::Handle::current().metrics().num_workers()
},
},
);

Expand Down Expand Up @@ -158,26 +167,28 @@ impl Database {
/// Mock the row count of a table for planner test.
fn handle_set(&self, stmt: &Statement) -> Result<bool, Error> {
if let Statement::Pragma { name, .. } = stmt {
let mut config = self.config.lock().unwrap();
match name.to_string().as_str() {
"enable_optimizer" => {
self.config.lock().unwrap().disable_optimizer = false;
return Ok(true);
}
"disable_optimizer" => {
self.config.lock().unwrap().disable_optimizer = true;
return Ok(true);
}
name => {
return Err(crate::binder::BindError::NoPragma(name.into()).into());
}
"enable_optimizer" => config.disable_optimizer = false,
"disable_optimizer" => config.disable_optimizer = true,
name => return Err(crate::binder::BindError::NoPragma(name.into()).into()),
}
return Ok(true);
}
let Statement::SetVariable {
variable, value, ..
} = stmt
else {
return Ok(false);
};
if variable.0[0].value == "parallelism" {
let mut config = self.config.lock().unwrap();
config.parallelism = value[0]
.to_string()
.parse::<usize>()
.map_err(|_| Error::Internal("invalid parallelism".into()))?;
return Ok(true);
}
let Some(table_name) = variable.0[0].value.strip_prefix("mock_rowcount_") else {
return Ok(false);
};
Expand All @@ -202,6 +213,11 @@ impl Database {
fn pragma_options() -> &'static [&'static str] {
&["enable_optimizer", "disable_optimizer"]
}

/// Return all available set variables.
fn set_variables() -> &'static [&'static str] {
&["parallelism"]
}
}

/// The error type of database operations.
Expand Down Expand Up @@ -268,6 +284,19 @@ impl rustyline::completion::Completer for &Database {
return Ok((pos - last_word.len(), candidates));
}

// completion for set variable
if prefix.trim().eq_ignore_ascii_case("set") {
let candidates = Database::set_variables()
.iter()
.filter(|option| option.starts_with(last_word))
.map(|option| rustyline::completion::Pair {
display: option.to_string(),
replacement: option.to_string(),
})
.collect();
return Ok((pos - last_word.len(), candidates));
}

// TODO: complete table and column names

// completion for keywords
Expand Down
Loading
Loading