Skip to content

Commit

Permalink
make it with_operation and with_commit_info
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Oct 25, 2024
1 parent b4feb4f commit 1fc535e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
6 changes: 4 additions & 2 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ impl Transaction {

/// Set the operation that this transaction is performing. This string will be persisted in the
/// commit and visible to anyone who describes the table history.
pub fn operation(&mut self, operation: String) {
pub fn with_operation(mut self, operation: String) -> Self {
self.operation = Some(operation);
self
}

/// WARNING: This is an unstable API and will likely change in the future.
Expand All @@ -107,8 +108,9 @@ impl Transaction {
/// that can either be `null` or contain an empty map.
///
/// Any other columns in the data chunk are ignored.
pub fn commit_info(&mut self, commit_info: Box<dyn EngineData>) {
pub fn with_commit_info(mut self, commit_info: Box<dyn EngineData>) -> Self {
self.commit_info = Some(commit_info.into());
self
}
}

Expand Down
21 changes: 12 additions & 9 deletions kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {
)]));
let table = create_table(store.clone(), table_location, schema, &[]).await?;

// create a transaction
let mut txn = table.new_transaction(&engine)?;

// add commit info of the form {engineCommitInfo: Map { "engineInfo": "default engine" } }
// create commit info of the form {engineCommitInfo: Map { "engineInfo": "default engine" } }
let commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"engineCommitInfo",
ArrowDataType::Map(
Expand Down Expand Up @@ -139,7 +136,11 @@ async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {

let commit_info_batch =
RecordBatch::try_new(commit_info_schema.clone(), vec![Arc::new(array)])?;
txn.commit_info(Box::new(ArrowEngineData::new(commit_info_batch)));

// create a transaction
let txn = table
.new_transaction(&engine)?
.with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch)));

// commit!
txn.commit(&engine)?;
Expand Down Expand Up @@ -212,11 +213,12 @@ async fn test_invalid_commit_info() -> Result<(), Box<dyn std::error::Error>> {
let table = create_table(store.clone(), table_location, schema, &[]).await?;

// empty commit info test
let mut txn = table.new_transaction(&engine)?;
let commit_info_schema = Arc::new(ArrowSchema::empty());
let commit_info_batch = RecordBatch::new_empty(commit_info_schema.clone());
assert!(commit_info_batch.num_rows() == 0);
txn.commit_info(Box::new(ArrowEngineData::new(commit_info_batch)));
let txn = table
.new_transaction(&engine)?
.with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch)));

// commit!
assert!(matches!(
Expand All @@ -225,7 +227,6 @@ async fn test_invalid_commit_info() -> Result<(), Box<dyn std::error::Error>> {
));

// two-row commit info test
let mut txn = table.new_transaction(&engine)?;
let commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"engineInfo",
ArrowDataType::Utf8,
Expand All @@ -239,7 +240,9 @@ async fn test_invalid_commit_info() -> Result<(), Box<dyn std::error::Error>> {
]))],
)?;

txn.commit_info(Box::new(ArrowEngineData::new(commit_info_batch)));
let txn = table
.new_transaction(&engine)?
.with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch)));

// commit!
assert!(matches!(
Expand Down

0 comments on commit 1fc535e

Please sign in to comment.