Skip to content

Commit

Permalink
todo: SQL 2016 (#129)
Browse files Browse the repository at this point in the history
* test: SQL 2016

* fix: SQL 2016

- Column defaults to nullable
- Fix the problem that Column is not lowercase when creating Table
- Added support for `TypedString`
- Support `START TRANSACTION & COMMIT WORK` on Server
- Fixed floating point number display problem and prioritized converting floating point numbers to double for calculation.

* code fmt
  • Loading branch information
KKould authored Feb 6, 2024
1 parent 815dfd5 commit c6d0536
Show file tree
Hide file tree
Showing 150 changed files with 4,567 additions and 103 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ implement_from_tuple!(
- Date
- DateTime

## Roadmap
- SQL 2016

## License

FnckSQL uses the [Apache 2.0 license][1] to strike a balance between
Expand Down
4 changes: 2 additions & 2 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl SimpleQueryHandler for SessionBackend {
C: ClientInfo + Unpin + Send + Sync,
{
match query.to_uppercase().as_str() {
"BEGIN;" | "BEGIN" => {
"BEGIN;" | "BEGIN" | "START TRANSACTION;" | "START TRANSACTION" => {
let mut guard = self.tx.lock().await;

if guard.is_some() {
Expand All @@ -115,7 +115,7 @@ impl SimpleQueryHandler for SessionBackend {

Ok(vec![Response::Execution(Tag::new("OK").into())])
}
"COMMIT;" | "COMMIT" => {
"COMMIT;" | "COMMIT" | "COMMIT WORK;" | "COMMIT WORK" => {
let mut guard = self.tx.lock().await;

if let Some(transaction) = guard.take() {
Expand Down
11 changes: 2 additions & 9 deletions src/binder/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@ impl<'a, T: Transaction> Binder<'a, T> {
let name = split_name(&name)?;
let table_name = Arc::new(name.to_string());

let table_catalog = self
.context
.table(table_name.clone())
.cloned()
.ok_or_else(|| DatabaseError::InvalidTable(format!("bind table {}", name)))?;
let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
let columns = table_catalog
.columns_with_id()
.filter_map(|(_, column)| column.desc.is_index().then_some(column.clone()))
.collect_vec();

let scan_op = ScanOperator::build(table_name.clone(), &table_catalog);
self.context
.add_bind_table(table_name.clone(), table_catalog, None)?;

let scan_op = ScanOperator::build(table_name.clone(), table_catalog);
let plan = LogicalPlan {
operator: Operator::Analyze(AnalyzeOperator {
table_name,
Expand Down
8 changes: 4 additions & 4 deletions src/binder/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
is_primary,
..
} => {
for column_name in column_names {
for column_name in column_names.iter().map(|ident| ident.value.to_lowercase()) {
if let Some(column) = columns
.iter_mut()
.find(|column| column.name() == column_name.to_string())
Expand Down Expand Up @@ -95,20 +95,20 @@ impl<'a, T: Transaction> Binder<'a, T> {
}

pub fn bind_column(&mut self, column_def: &ColumnDef) -> Result<ColumnCatalog, DatabaseError> {
let column_name = column_def.name.to_string();
let column_name = column_def.name.value.to_lowercase();
let mut column_desc = ColumnDesc::new(
LogicalType::try_from(column_def.data_type.clone())?,
false,
false,
None,
);
let mut nullable = false;
let mut nullable = true;

// TODO: 这里可以对更多字段可设置内容进行补充
for option_def in &column_def.options {
match &option_def.option {
ColumnOption::Null => nullable = true,
ColumnOption::NotNull => (),
ColumnOption::NotNull => nullable = false,
ColumnOption::Unique { is_primary } => {
if *is_primary {
column_desc.is_primary = true;
Expand Down
9 changes: 1 addition & 8 deletions src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,14 @@ impl<'a, T: Transaction> Binder<'a, T> {
let name = split_name(&name)?;
let table_name = Arc::new(name.to_string());

let table_catalog = self
.context
.table(table_name.clone())
.cloned()
.ok_or_else(|| DatabaseError::InvalidTable(format!("bind table {}", name)))?;
let table_catalog = self.context.table_and_bind(table_name.clone(), None)?;
let primary_key_column = table_catalog
.columns_with_id()
.find(|(_, column)| column.desc.is_primary)
.map(|(_, column)| Arc::clone(column))
.unwrap();
let mut plan = ScanOperator::build(table_name.clone(), &table_catalog);

self.context
.add_bind_table(table_name.clone(), table_catalog, None)?;

if let Some(alias) = alias {
self.context
.add_table_alias(alias.to_string(), table_name.clone())?;
Expand Down
76 changes: 49 additions & 27 deletions src/binder/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
Expr::CompoundIdentifier(idents) => self.bind_column_ref_from_identifiers(idents, None),
Expr::BinaryOp { left, right, op } => self.bind_binary_op_internal(left, right, op),
Expr::Value(v) => Ok(ScalarExpression::Constant(Arc::new(v.into()))),
Expr::Function(func) => self.bind_agg_call(func),
Expr::Function(func) => self.bind_function(func),
Expr::Nested(expr) => self.bind_expr(expr),
Expr::UnaryOp { expr, op } => self.bind_unary_op_internal(expr, op),
Expr::Like {
Expand All @@ -40,6 +40,12 @@ impl<'a, T: Transaction> Binder<'a, T> {
negated,
} => self.bind_is_in(expr, list, *negated),
Expr::Cast { expr, data_type } => self.bind_cast(expr, data_type),
Expr::TypedString { data_type, value } => {
let logical_type = LogicalType::try_from(data_type.clone())?;
let value = DataValue::Utf8(Some(value.to_string())).cast(&logical_type)?;

Ok(ScalarExpression::Constant(Arc::new(value)))
}
_ => {
todo!()
}
Expand Down Expand Up @@ -152,6 +158,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
| BinaryOperator::And
| BinaryOperator::Or
| BinaryOperator::Xor => LogicalType::Boolean,
BinaryOperator::StringConcat => LogicalType::Varchar(None),
_ => todo!(),
};

Expand Down Expand Up @@ -182,7 +189,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
})
}

fn bind_agg_call(&mut self, func: &Function) -> Result<ScalarExpression, DatabaseError> {
fn bind_function(&mut self, func: &Function) -> Result<ScalarExpression, DatabaseError> {
let mut args = Vec::with_capacity(func.args.len());

for arg in func.args.iter() {
Expand All @@ -196,7 +203,6 @@ impl<'a, T: Transaction> Binder<'a, T> {
_ => todo!(),
}
}
let ty = args[0].return_type();

Ok(match func.name.to_string().to_lowercase().as_str() {
"count" => ScalarExpression::AggCall {
Expand All @@ -205,30 +211,46 @@ impl<'a, T: Transaction> Binder<'a, T> {
args,
ty: LogicalType::Integer,
},
"sum" => ScalarExpression::AggCall {
distinct: func.distinct,
kind: AggKind::Sum,
args,
ty,
},
"min" => ScalarExpression::AggCall {
distinct: func.distinct,
kind: AggKind::Min,
args,
ty,
},
"max" => ScalarExpression::AggCall {
distinct: func.distinct,
kind: AggKind::Max,
args,
ty,
},
"avg" => ScalarExpression::AggCall {
distinct: func.distinct,
kind: AggKind::Avg,
args,
ty,
},
"sum" => {
let ty = args[0].return_type();

ScalarExpression::AggCall {
distinct: func.distinct,
kind: AggKind::Sum,
args,
ty,
}
}
"min" => {
let ty = args[0].return_type();

ScalarExpression::AggCall {
distinct: func.distinct,
kind: AggKind::Min,
args,
ty,
}
}
"max" => {
let ty = args[0].return_type();

ScalarExpression::AggCall {
distinct: func.distinct,
kind: AggKind::Max,
args,
ty,
}
}
"avg" => {
let ty = args[0].return_type();

ScalarExpression::AggCall {
distinct: func.distinct,
kind: AggKind::Avg,
args,
ty,
}
}
_ => todo!(),
})
}
Expand Down
45 changes: 27 additions & 18 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum InputRefType {
#[derive(Clone)]
pub struct BinderContext<'a, T: Transaction> {
transaction: &'a T,
pub(crate) bind_table: BTreeMap<TableName, (TableCatalog, Option<JoinType>)>,
pub(crate) bind_table: BTreeMap<TableName, (&'a TableCatalog, Option<JoinType>)>,
aliases: BTreeMap<String, ScalarExpression>,
table_aliases: BTreeMap<String, TableName>,
group_by_exprs: Vec<ScalarExpression>,
Expand All @@ -60,6 +60,32 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
}
}

pub fn table_and_bind(
&mut self,
table_name: TableName,
join_type: Option<JoinType>,
) -> Result<&TableCatalog, DatabaseError> {
let table = if let Some(real_name) = self.table_aliases.get(table_name.as_ref()) {
self.transaction.table(real_name.clone())
} else {
self.transaction.table(table_name.clone())
}
.ok_or(DatabaseError::TableNotFound)?;

let is_bound = self
.bind_table
.insert(table_name.clone(), (table, join_type))
.is_some();
if is_bound {
return Err(DatabaseError::InvalidTable(format!(
"{} duplicated",
table_name
)));
}

Ok(table)
}

// Tips: The order of this index is based on Aggregate being bound first.
pub fn input_ref_index(&self, ty: InputRefType) -> usize {
match ty {
Expand Down Expand Up @@ -100,23 +126,6 @@ impl<'a, T: Transaction> BinderContext<'a, T> {
Ok(())
}

pub fn add_bind_table(
&mut self,
table: TableName,
table_catalog: TableCatalog,
join_type: Option<JoinType>,
) -> Result<(), DatabaseError> {
let is_bound = self
.bind_table
.insert(table.clone(), (table_catalog.clone(), join_type))
.is_some();
if is_bound {
return Err(DatabaseError::InvalidTable(format!("{} duplicated", table)));
}

Ok(())
}

pub fn has_agg_call(&self, expr: &ScalarExpression) -> bool {
self.group_by_exprs.contains(expr)
}
Expand Down
45 changes: 28 additions & 17 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,9 @@ impl<'a, T: Transaction> Binder<'a, T> {
) -> Result<(Arc<String>, LogicalPlan), DatabaseError> {
let table_name = Arc::new(table.to_string());

let table_catalog = self
.context
.table(table_name.clone())
.cloned()
.ok_or_else(|| DatabaseError::InvalidTable(format!("bind table {}", table)))?;
let table_catalog = self.context.table_and_bind(table_name.clone(), join_type)?;
let scan_op = ScanOperator::build(table_name.clone(), &table_catalog);

self.context
.add_bind_table(table_name.clone(), table_catalog, join_type)?;

if let Some(alias) = alias {
self.context
.add_table_alias(alias.to_string(), table_name.clone())?;
Expand Down Expand Up @@ -247,8 +240,30 @@ impl<'a, T: Transaction> Binder<'a, T> {
SelectItem::Wildcard(_) => {
select_items.extend_from_slice(self.bind_all_column_refs()?.as_slice());
}

_ => todo!("bind select list"),
SelectItem::QualifiedWildcard(table_name, _) => {
let idents = &table_name.0;
let table_name = match idents.as_slice() {
[table] => Arc::new(table.value.to_lowercase()),
[_, table] => Arc::new(table.value.to_lowercase()),
_ => {
return Err(DatabaseError::InvalidTable(
idents
.iter()
.map(|ident| ident.value.clone())
.join(".")
.to_string(),
))
}
};

let table = self
.context
.table(table_name.clone())
.ok_or(DatabaseError::TableNotFound)?;
for (_, col) in table.columns_with_id() {
select_items.push(ScalarExpression::ColumnRef(col.clone()));
}
}
};
}

Expand All @@ -261,7 +276,7 @@ impl<'a, T: Transaction> Binder<'a, T> {
let table = self
.context
.table(table_name.clone())
.ok_or_else(|| DatabaseError::InvalidTable(table_name.to_string()))?;
.ok_or(DatabaseError::TableNotFound)?;
for (_, col) in table.columns_with_id() {
exprs.push(ScalarExpression::ColumnRef(col.clone()));
}
Expand Down Expand Up @@ -296,16 +311,12 @@ impl<'a, T: Transaction> Binder<'a, T> {
.context
.table(left_table.clone())
.cloned()
.ok_or_else(|| {
DatabaseError::InvalidTable(format!("Left: {} not found", left_table))
})?;
.ok_or(DatabaseError::TableNotFound)?;
let right_table = self
.context
.table(right_table.clone())
.cloned()
.ok_or_else(|| {
DatabaseError::InvalidTable(format!("Right: {} not found", right_table))
})?;
.ok_or(DatabaseError::TableNotFound)?;

let on = match joint_condition {
Some(constraint) => self.bind_join_constraint(&left_table, &right_table, constraint)?,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl ColumnCatalog {
name: column_name,
table_name: None,
},
nullable: false,
nullable: true,
desc: ColumnDesc::new(LogicalType::Varchar(None), false, false, None),
ref_expr: None,
}
Expand Down
9 changes: 8 additions & 1 deletion src/execution/volcano/dql/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::execution::volcano::{BoxedExecutor, ReadExecutor};
use crate::storage::Transaction;
use crate::types::tuple::Tuple;
use futures_async_stream::try_stream;
use std::sync::Arc;

pub struct Dummy {}

Expand All @@ -14,5 +15,11 @@ impl<T: Transaction> ReadExecutor<T> for Dummy {

impl Dummy {
#[try_stream(boxed, ok = Tuple, error = DatabaseError)]
pub async fn _execute(self) {}
pub async fn _execute(self) {
yield Tuple {
id: None,
columns: Arc::new(vec![]),
values: vec![],
}
}
}
Loading

0 comments on commit c6d0536

Please sign in to comment.