Skip to content

Commit

Permalink
refactor: TupleBuiler & value_compute (#116)
Browse files Browse the repository at this point in the history
* refactor(tuple_builder)

* refactor(value_compute): use Marco for logic encapsulation
  • Loading branch information
KKould authored Jan 1, 2024
1 parent 9cc8c89 commit 92618d2
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 784 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "kip-sql"
version = "0.0.1-alpha.6"
version = "0.0.1-alpha.7"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "build the SQL layer of KipDB database"
Expand All @@ -24,7 +24,6 @@ tracing = "0.1.37"
chrono = "0.4.26"
tokio = { version = "1.28.2", features = ["full"] }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
bincode = "1.3.3"
integer-encoding = "3.0.4"
strum_macros = "0.24"
Expand Down
15 changes: 6 additions & 9 deletions src/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,24 @@ impl<'a, T: Transaction> Binder<'a, T> {
},
format: FileFormat::from_options(options),
};
let types = cols.iter().map(|c| c.desc.column_datatype).collect();

let copy = if to {
if to {
// COPY <source_table> TO <dest_file>
LogicalPlan {
Ok(LogicalPlan {
operator: Operator::CopyToFile(CopyToFileOperator { source: ext_source }),
childrens: vec![],
}
})
} else {
// COPY <dest_table> FROM <source_file>
LogicalPlan {
Ok(LogicalPlan {
operator: Operator::CopyFromFile(CopyFromFileOperator {
source: ext_source,
types,
columns: cols,
table: table_name.to_string(),
}),
childrens: vec![],
}
};
Ok(copy)
})
}
} else {
Err(BindError::InvalidTable(format!(
"not found table {}",
Expand Down
5 changes: 1 addition & 4 deletions src/execution/executor/ddl/alter_table/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ impl AddColumn {
}
}

let tuple_builder = TupleBuilder::new_result();
let tuple = tuple_builder.push_result("ALTER TABLE SUCCESS", "1")?;

yield tuple;
yield TupleBuilder::build_result("ALTER TABLE SUCCESS".to_string(), "1".to_string())?;
}
}
5 changes: 1 addition & 4 deletions src/execution/executor/ddl/alter_table/drop_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ impl DropColumn {
}
transaction.drop_column(table_name, column_name, *if_exists)?;

let tuple_builder = TupleBuilder::new_result();
let tuple = tuple_builder.push_result("ALTER TABLE SUCCESS", "1")?;

yield tuple;
yield TupleBuilder::build_result("ALTER TABLE SUCCESS".to_string(), "1".to_string())?;
}
}
8 changes: 4 additions & 4 deletions src/execution/executor/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ impl CreateTable {
if_not_exists,
} = self.op;
let _ = transaction.create_table(table_name.clone(), columns, if_not_exists)?;
let tuple_builder = TupleBuilder::new_result();
let tuple = tuple_builder
.push_result("CREATE TABLE SUCCESS", format!("{}", table_name).as_str())?;

yield tuple;
yield TupleBuilder::build_result(
"CREATE TABLE SUCCESS".to_string(),
format!("{}", table_name),
)?;
}
}
49 changes: 22 additions & 27 deletions src/execution/executor/dml/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,10 @@ impl CopyFromFile {
.from_reader(&mut buf_reader),
};

let column_count = self.op.types.len();
let mut size_count = 0;
let column_count = self.op.columns.len();
let tuple_builder = TupleBuilder::new(self.op.columns.clone());

for record in reader.records() {
let mut tuple_builder =
TupleBuilder::new(self.op.types.clone(), self.op.columns.clone());
// read records and push raw str rows into data chunk builder
let record = record?;

Expand All @@ -89,23 +87,22 @@ impl CopyFromFile {
});
}

size_count += 1;

// push a raw str row and send it if necessary
if let Some(chunk) = tuple_builder.push_str_row(record.iter())? {
tx.blocking_send(chunk).map_err(|_| ExecutorError::Abort)?;
}
self.size += 1;
tx.blocking_send(tuple_builder.build_with_row(record.iter())?)
.map_err(|_| ExecutorError::ChannelClose)?;
}
self.size = size_count;
Ok(())
}
}

fn return_result(size: usize, tx: Sender<Tuple>) -> Result<(), ExecutorError> {
let tuple_builder = TupleBuilder::new_result();
let tuple =
tuple_builder.push_result("COPY FROM SOURCE", format!("import {} rows", size).as_str())?;
tx.blocking_send(tuple).map_err(|_| ExecutorError::Abort)?;
let tuple = TupleBuilder::build_result(
"COPY FROM SOURCE".to_string(),
format!("import {} rows", size),
)?;

tx.blocking_send(tuple)
.map_err(|_| ExecutorError::ChannelClose)?;
Ok(())
}

Expand Down Expand Up @@ -172,12 +169,7 @@ mod tests {
},
},

types: vec![
LogicalType::Integer,
LogicalType::Float,
LogicalType::Varchar(Some(10)),
],
columns: columns.clone(),
columns,
};
let executor = CopyFromFile {
op: op.clone(),
Expand All @@ -191,13 +183,16 @@ mod tests {
.await;
let storage = db.storage;
let transaction = RefCell::new(storage.transaction().await?);
let actual = executor.execute(&transaction).next().await.unwrap()?;

let tuple_builder = TupleBuilder::new_result();
let expected = tuple_builder
.push_result("COPY FROM SOURCE", format!("import {} rows", 2).as_str())
.unwrap();
assert_eq!(actual, expected);
let tuple = executor.execute(&transaction).next().await.unwrap()?;
assert_eq!(
tuple,
TupleBuilder::build_result(
"COPY FROM SOURCE".to_string(),
format!("import {} rows", 2)
)
.unwrap()
);

Ok(())
}
Expand Down
6 changes: 2 additions & 4 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ pub enum ExecutorError {
),
#[error("tuple length mismatch: expected {expected} but got {actual}")]
LengthMismatch { expected: usize, actual: usize },
#[error("abort")]
Abort,
#[error("unknown error")]
Unknown,
#[error("join error")]
JoinError(
#[from]
#[source]
tokio::task::JoinError,
),
#[error("channel close")]
ChannelClose,
}
Loading

0 comments on commit 92618d2

Please sign in to comment.