Skip to content

Commit

Permalink
commit info working
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Oct 2, 2024
1 parent 4b747b7 commit e394127
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 63 deletions.
2 changes: 1 addition & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
lazy_static! {
static ref LOG_SCHEMA: StructType = StructType::new(
vec![
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME), // FIXME REORDER
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
Option::<Transaction>::get_struct_field(TRANSACTION_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
// We don't support the following actions yet
//Option::<Cdc>::get_struct_field(CDC_NAME),
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
Expand Down
57 changes: 50 additions & 7 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, LazyLock};

use crate::actions::get_log_schema;
use crate::expressions::Scalar;
use crate::schema::{Schema, StructType};
use crate::snapshot::Snapshot;
use crate::{DataType, Expression};
Expand Down Expand Up @@ -35,22 +36,61 @@ impl Transaction {
}

pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult> {
// lazy lock for the expression
static COMMIT_INFO_EXPR: LazyLock<Expression> =
LazyLock::new(|| Expression::column("commitInfo"));

// step one: construct the iterator of actions we want to commit
let action_schema = get_log_schema();

let actions = self.commit_info.into_iter().map(|commit_info| {
// expression to select all the columns
let commit_info_expr = Expression::Struct(vec![
//Expression::Literal(Scalar::Null(
// action_schema
// .project(&[crate::actions::ADD_NAME])
// .unwrap()
// .into(),
//)),
//Expression::Literal(Scalar::Null(
// action_schema
// .project(&[crate::actions::REMOVE_NAME])
// .unwrap()
// .into(),
//)),
//Expression::Literal(Scalar::Null(
// action_schema
// .project(&[crate::actions::METADATA_NAME])
// .unwrap()
// .into(),
//)),
//Expression::Literal(Scalar::Null(
// action_schema
// .project(&[crate::actions::PROTOCOL_NAME])
// .unwrap()
// .into(),
//)),
//Expression::Literal(Scalar::Null(
// action_schema
// .project(&[crate::actions::TRANSACTION_NAME])
// .unwrap()
// .into(),
//)),
Expression::Struct(
commit_info
.schema
.fields()
.map(|f| Expression::column(f.name()))
.collect(),
),
]);

// commit info has arbitrary schema ex: {engineInfo: string, operation: string}
// we want to bundle it up and put it in the commit_info field of the actions.
let commit_info_evaluator = engine.get_expression_handler().get_evaluator(
commit_info.schema.into(),
COMMIT_INFO_EXPR.clone(), // TODO remove clone?
commit_info_expr,
<StructType as Into<DataType>>::into(action_schema.clone()),
);
commit_info_evaluator.evaluate(commit_info.data.as_ref()).unwrap()
commit_info_evaluator
.evaluate(commit_info.data.as_ref())
.unwrap()
});

// step two: figure out the commit version and path to write
Expand All @@ -73,7 +113,10 @@ impl Transaction {
/// first action in the commit. Note it is required in order to commit. If the engine does not
/// require any commit info, pass an empty `EngineData`.
pub fn commit_info(&mut self, commit_info: Box<dyn EngineData>, schema: Schema) {
self.commit_info = Some(CommitInfoData { data: commit_info, schema });
self.commit_info = Some(CommitInfoData {
data: commit_info,
schema,
});
}
}

Expand Down
79 changes: 24 additions & 55 deletions kernel/tests/write.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, Int64Array, StringArray};
use arrow::array::StringArray;
use arrow::record_batch::RecordBatch;
use arrow_schema::Field;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::ObjectStore;
Expand All @@ -12,7 +11,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::{DeltaResult, Engine, EngineData, Table};
use delta_kernel::Table;

// fixme use in macro below
// const PROTOCOL_METADATA_TEMPLATE: &'static str = r#"{{"protocol":{{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":[],"writerFeatures":[]}}}}
Expand Down Expand Up @@ -64,48 +63,6 @@ async fn create_table(
Ok(Table::new(table_path))
}

// FIXME delete/unify from default/parquet.rs
fn get_metadata_schema() -> Arc<arrow_schema::Schema> {
let path_field = Field::new("path", arrow_schema::DataType::Utf8, false);
let size_field = Field::new("size", arrow_schema::DataType::Int64, false);
let partition_field = Field::new(
"partitionValues",
arrow_schema::DataType::Map(
Arc::new(Field::new(
"entries",
arrow_schema::DataType::Struct(
vec![
Field::new("keys", arrow_schema::DataType::Utf8, false),
Field::new("values", arrow_schema::DataType::Utf8, true),
]
.into(),
),
false,
)),
false,
),
false,
);
let data_change_field = Field::new("dataChange", arrow_schema::DataType::Boolean, false);
let modification_time_field =
Field::new("modificationTime", arrow_schema::DataType::Int64, false);

Arc::new(arrow_schema::Schema::new(vec![Field::new(
"add",
arrow_schema::DataType::Struct(
vec![
path_field.clone(),
size_field.clone(),
partition_field.clone(),
data_change_field.clone(),
modification_time_field.clone(),
]
.into(),
),
false,
)]))
}

#[tokio::test]
async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {
// setup tracing
Expand All @@ -127,22 +84,34 @@ async fn test_commit_info() -> Result<(), Box<dyn std::error::Error>> {
"",
)
.await?;
// println!(
// "{:?}",
// store
// .get(&Path::from(
// "/test_table/_delta_log/00000000000000000000.json"
// ))
// .await
// );
// append an arrow record batch (vec of record batches??)
let mut txn = table.new_transaction(&engine)?;

use arrow_schema::Schema as ArrowSchema;
use arrow_schema::{DataType as ArrowDataType, Field};

// add commit info of the form {engineInfo: "default engine"}
let commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"engineInfo",
ArrowDataType::Utf8,
true,
)]));
let commit_info_batch = RecordBatch::try_new(
commit_info_schema.clone(),
vec![Arc::new(StringArray::from(vec!["default engine"]))],
)?;
txn.commit_info(
Box::new(ArrowEngineData::new(commit_info_batch)),
commit_info_schema.try_into()?,
);
txn.commit(&engine)?;
let commit1 = store
.get(&Path::from(
"/test_table/_delta_log/00000000000000000001.json",
))
.await?;
println!("commit1: {}", String::from_utf8(commit1.bytes().await?.to_vec())?);
assert_eq!(
String::from_utf8(commit1.bytes().await?.to_vec())?,
"{\"commitInfo\":{\"kernelVersion\":\"default engine\"}}\n"
);
Ok(())
}

0 comments on commit e394127

Please sign in to comment.