Skip to content

Commit

Permalink
feat: support more join case (KipData#204)
Browse files Browse the repository at this point in the history
* feat: support more join case

- reconstruct the alias mechanism to ensure correctness during Join
- support Natural Join
- support multiple from(to Cross Join)
- when Using and Natural Join, the same columns are automatically removed (Fixme: JoinType needs to be supported to decide whether to use the columns of the left table or the right table)
- `RangeDetacher::detach` removes meaningless `Result`

* feat: support more join case

- fix: `UnaryEvaluator` & `BinaryEvaluator` handle the situation when the data is Null
- fix: use an independent Binder when joining and the parent binder can extend context(from subquery join)
- feat: add `Mod` for `BinaryEvaluator`

* fix: lifetime on select.rs

* style: codefmt
  • Loading branch information
KKould authored Apr 10, 2024
1 parent 74ea950 commit eac3ae8
Show file tree
Hide file tree
Showing 32 changed files with 1,605 additions and 214 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ implement_from_tuple!(
- User-Defined Function: `features = ["marcos"]`
```rust
function!(TestFunction::test(LogicalType::Integer, LogicalType::Integer) -> LogicalType::Integer => |v1: ValueRef, v2: ValueRef| {
let value = DataValue::binary_op(&v1, &v2, &BinaryOperator::Plus)?;
DataValue::unary_op(&value, &UnaryOperator::Minus)
let plus_binary_evaluator = EvaluatorFactory::binary_create(LogicalType::Integer, BinaryOperator::Plus)?;
let value = plus_binary_evaluator.binary_eval(&v1, &v2);

let plus_unary_evaluator = EvaluatorFactory::unary_create(LogicalType::Integer, UnaryOperator::Minus)?;
Ok(plus_unary_evaluator.unary_eval(&value))
});

let fnck_sql = DataBaseBuilder::path("./data")
Expand Down Expand Up @@ -178,7 +181,7 @@ let fnck_sql = DataBaseBuilder::path("./data")
- [x] Alias
- [x] Aggregation: count()/sum()/avg()/min()/max()
- [x] SubQuery[select/from/where]
- [x] Join: Inner/Left/Right/Full/Cross
- [x] Join: Inner/Left/Right/Full/Cross (Natural\Using)
- [x] Group By
- [x] Having
- [x] Order By
Expand Down
2 changes: 1 addition & 1 deletion src/binder/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{

use super::{Binder, QueryBindStep};

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub fn bind_aggregate(
&mut self,
children: LogicalPlan,
Expand Down
2 changes: 1 addition & 1 deletion src/binder/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_alter_table(
&mut self,
name: &ObjectName,
Expand Down
6 changes: 4 additions & 2 deletions src/binder/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_analyze(&mut self, name: &ObjectName) -> Result<LogicalPlan, DatabaseError> {
let table_name = Arc::new(lower_case_name(name)?);

let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
let table_catalog = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let index_metas = table_catalog.indexes.clone();

let scan_op = ScanOperator::build(table_name.clone(), table_catalog);
Expand Down
2 changes: 1 addition & 1 deletion src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl FromStr for ExtSource {
}
}

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(super) fn bind_copy(
&mut self,
source: CopySource,
Expand Down
6 changes: 4 additions & 2 deletions src/binder/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::types::index::IndexType;
use sqlparser::ast::{ObjectName, OrderByExpr};
use std::sync::Arc;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_create_index(
&mut self,
table_name: &ObjectName,
Expand All @@ -29,7 +29,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
IndexType::Composite
};

let table = self.context.table_and_bind(table_name.clone(), None)?;
let table = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let plan = ScanOperator::build(table_name.clone(), table);
let mut columns = Vec::with_capacity(exprs.len());

Expand Down
2 changes: 1 addition & 1 deletion src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use crate::types::LogicalType;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
// TODO: TableConstraint
pub(crate) fn bind_create_table(
&mut self,
Expand Down
20 changes: 14 additions & 6 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,37 @@ use crate::planner::operator::scan::ScanOperator;
use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;
use sqlparser::ast::{Expr, TableFactor, TableWithJoins};
use sqlparser::ast::{Expr, TableAlias, TableFactor, TableWithJoins};
use std::sync::Arc;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_delete(
&mut self,
from: &TableWithJoins,
selection: &Option<Expr>,
) -> Result<LogicalPlan, DatabaseError> {
if let TableFactor::Table { name, alias, .. } = &from.relation {
let table_name = Arc::new(lower_case_name(name)?);
let mut table_alias = None;
let mut alias_idents = None;

let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
if let Some(TableAlias { name, columns }) = alias {
table_alias = Some(Arc::new(name.value.to_lowercase()));
alias_idents = Some(columns);
}
let table_catalog =
self.context
.table_and_bind(table_name.clone(), table_alias.clone(), None)?;
let primary_key_column = table_catalog
.columns()
.find(|column| column.desc.is_primary)
.cloned()
.unwrap();
let mut plan = ScanOperator::build(table_name.clone(), table_catalog);

if let Some(alias) = alias {
self.context
.add_table_alias(alias.to_string(), table_name.clone());
if let Some(alias_idents) = alias_idents {
plan =
self.bind_alias(plan, alias_idents, table_alias.unwrap(), table_name.clone())?;
}

if let Some(predicate) = selection {
Expand Down
2 changes: 1 addition & 1 deletion src/binder/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_describe(
&mut self,
name: &ObjectName,
Expand Down
2 changes: 1 addition & 1 deletion src/binder/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::planner::operator::aggregate::AggregateOperator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub fn bind_distinct(
&mut self,
children: LogicalPlan,
Expand Down
2 changes: 1 addition & 1 deletion src/binder/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::storage::Transaction;
use sqlparser::ast::ObjectName;
use std::sync::Arc;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_drop_table(
&mut self,
name: &ObjectName,
Expand Down
2 changes: 1 addition & 1 deletion src/binder/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::planner::operator::Operator;
use crate::planner::LogicalPlan;
use crate::storage::Transaction;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_explain(&mut self, plan: LogicalPlan) -> Result<LogicalPlan, DatabaseError> {
Ok(LogicalPlan::new(Operator::Explain, vec![plan]))
}
Expand Down
47 changes: 30 additions & 17 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use crate::types::value::{DataValue, Utf8Type};
use crate::types::LogicalType;

macro_rules! try_alias {
($context:expr, $column_name:expr) => {
if let Some(expr) = $context.expr_aliases.get(&$column_name) {
($context:expr, $full_name:expr) => {
if let Some(expr) = $context.expr_aliases.get(&$full_name) {
return Ok(ScalarExpression::Alias {
expr: Box::new(expr.clone()),
alias: AliasType::Name($column_name),
alias: AliasType::Name($full_name.1),
});
}
};
Expand All @@ -37,7 +37,7 @@ macro_rules! try_default {
};
}

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_expr(&mut self, expr: &Expr) -> Result<ScalarExpression, DatabaseError> {
match expr {
Expr::Identifier(ident) => {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
idents: &[Ident],
bind_table_name: Option<String>,
) -> Result<ScalarExpression, DatabaseError> {
let (table_name, column_name) = match idents {
let full_name = match idents {
[column] => (None, lower_ident(column)),
[table, column] => (Some(lower_ident(table)), lower_ident(column)),
_ => {
Expand All @@ -296,25 +296,40 @@ impl<'a, T: Transaction> Binder<'a, T> {
))
}
};
try_alias!(self.context, column_name);
try_alias!(self.context, full_name);
if self.context.allow_default {
try_default!(&table_name, column_name);
try_default!(&full_name.0, full_name.1);
}
if let Some(table) = table_name.or(bind_table_name) {
if let Some(table) = full_name.0.or(bind_table_name) {
let table_catalog = self.context.bind_table(&table, self.parent)?;

let column_catalog = table_catalog
.get_column_by_name(&column_name)
.ok_or_else(|| DatabaseError::NotFound("column", column_name))?;
.get_column_by_name(&full_name.1)
.ok_or_else(|| DatabaseError::NotFound("column", full_name.1))?;
Ok(ScalarExpression::ColumnRef(column_catalog.clone()))
} else {
let op = |got_column: &mut Option<&'a ColumnRef>, context: &BinderContext<'a, T>| {
for table_catalog in context.bind_table.values() {
let op = |got_column: &mut Option<ScalarExpression>, context: &BinderContext<'a, T>| {
for ((_, alias, _), table_catalog) in context.bind_table.iter() {
if got_column.is_some() {
break;
}
if let Some(column_catalog) = table_catalog.get_column_by_name(&column_name) {
*got_column = Some(column_catalog);
if let Some(alias) = alias {
*got_column = self.context.expr_aliases.iter().find_map(
|((alias_table, alias_column), expr)| {
matches!(
alias_table
.as_ref()
.map(|table_name| table_name == alias.as_ref()
&& alias_column == &full_name.1),
Some(true)
)
.then(|| expr.clone())
},
);
} else if let Some(column_catalog) =
table_catalog.get_column_by_name(&full_name.1)
{
*got_column = Some(ScalarExpression::ColumnRef(column_catalog.clone()));
}
}
};
Expand All @@ -325,9 +340,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
if let Some(parent) = self.parent {
op(&mut got_column, &parent.context);
}
let column_catalog =
got_column.ok_or_else(|| DatabaseError::NotFound("column", column_name))?;
Ok(ScalarExpression::ColumnRef(column_catalog.clone()))
Ok(got_column.ok_or_else(|| DatabaseError::NotFound("column", full_name.1))?)
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use sqlparser::ast::{Expr, Ident, ObjectName};
use std::slice;
use std::sync::Arc;

impl<'a, T: Transaction> Binder<'a, T> {
impl<'a, 'b, T: Transaction> Binder<'a, 'b, T> {
pub(crate) fn bind_insert(
&mut self,
name: &ObjectName,
Expand All @@ -24,7 +24,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
self.context.allow_default = true;
let table_name = Arc::new(lower_case_name(name)?);

let table = self.context.table_and_bind(table_name.clone(), None)?;
let table = self
.context
.table_and_bind(table_name.clone(), None, None)?;
let mut _schema_ref = None;
let values_len = expr_rows[0].len();

Expand Down
Loading

0 comments on commit eac3ae8

Please sign in to comment.