From f1dcda917ef7e986aa401bda9a0cb59a23c6f77c Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 3 Oct 2024 21:24:05 -0700 Subject: [PATCH] better --- kernel/src/engine/default/filesystem.rs | 1 - kernel/src/engine/default/json.rs | 13 +- kernel/src/engine/default/mod.rs | 2 - kernel/src/table.rs | 2 +- kernel/src/transaction.rs | 226 ++++++++++++------------ 5 files changed, 111 insertions(+), 133 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 669a0c8ef..b9036d04e 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -56,7 +56,6 @@ impl FileSystemClient for ObjectStoreFileSystemClient { match meta { Ok(meta) => { let mut location = url.clone(); - println!("listed location: {:?}", meta.location); location.set_path(&format!("/{}", meta.location.as_ref())); sender .send(Ok(FileMeta { diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 3c04def6f..14e3be7a1 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -143,7 +143,6 @@ impl JsonHandler for DefaultJsonHandler { data: Box> + Send + 'a>, _overwrite: bool, ) -> DeltaResult<()> { - // Initialize schema and batches let mut schema: Option = None; let mut batches: Vec = Vec::new(); @@ -155,29 +154,19 @@ impl JsonHandler for DefaultJsonHandler { schema = Some(record_batch.schema()); } - println!("[Engine put_json] schema: {:#?}", record_batch.schema()); batches.push(record_batch.clone()); } - for batch in batches.iter() { - println!("==============================================================="); - println!("[Engine put_json] batch: {:#?}", batch); - println!("[Engine put_json] schema: {:#?}", batch.schema()); - println!("==============================================================="); - } - // collect all batches let batches: Vec<&RecordBatch> = batches.iter().collect(); - // Write the concatenated batch to JSON + // write the batches to JSON let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new()); writer.write_batches(&batches)?; writer.finish()?; let buffer = writer.into_inner(); - println!("[Engine put_json] commit path: {:?}", path.path()); - // Put if absent futures::executor::block_on(async { self.store diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 5abb0b9c3..12c89ca2e 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -53,7 +53,6 @@ impl DefaultEngine { { // table root is the path of the table in the ObjectStore let (store, table_root) = parse_url_opts(table_root, options)?; - println!("DEFAULT ENGINE INIT try_new table root: {:?}", table_root); let store = Arc::new(store); Ok(Self { file_system: Arc::new(ObjectStoreFileSystemClient::new( @@ -79,7 +78,6 @@ impl DefaultEngine { /// - `table_root_path`: The root path of the table within storage. /// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor]. pub fn new(store: Arc, table_root_path: Path, task_executor: Arc) -> Self { - println!("DEFAULT ENGINE INIT new table root: {:?}", table_root_path); Self { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), diff --git a/kernel/src/table.rs b/kernel/src/table.rs index 087214851..8b1cf1d20 100644 --- a/kernel/src/table.rs +++ b/kernel/src/table.rs @@ -78,7 +78,7 @@ impl Table { Snapshot::try_new(self.location.clone(), engine, version) } - /// Create a new write transaction builder for the table. + /// Create a new write transaction for this table. pub fn new_transaction(&self, engine: &dyn Engine) -> DeltaResult { let latest_snapshot = Snapshot::try_new(self.location.clone(), engine, None)?; Ok(Transaction::new(latest_snapshot)) diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index c8732b4ed..8e184a69a 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -2,17 +2,17 @@ use std::sync::Arc; use crate::actions::get_log_schema; use crate::expressions::Scalar; -use crate::schema::{Schema, StructType}; +use crate::schema::{Schema, SchemaRef, StructType}; use crate::snapshot::Snapshot; use crate::{DataType, Expression}; use crate::{DeltaResult, Engine, EngineData}; pub struct Transaction { read_snapshot: Arc, - commit_info: Option, + commit_info: Option, } -pub struct CommitInfoData { +struct EngineCommitInfo { data: Box, schema: Schema, } @@ -36,120 +36,11 @@ impl Transaction { } pub fn commit(self, engine: &dyn Engine) -> DeltaResult { - use crate::actions::{ - ADD_NAME, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, TRANSACTION_NAME, - }; - // step one: construct the iterator of actions we want to commit - let action_schema = get_log_schema(); - - let actions = self.commit_info.into_iter().map(|commit_info| { - // expression to select all the columns - let mut commit_info_expr = vec![Expression::literal("v0.3.1")]; - commit_info_expr.extend( - commit_info - .schema - .fields() - .map(|f| Expression::column(f.name())) - .collect::>(), - ); - let commit_info_expr = Expression::Struct(vec![ - Expression::Literal(Scalar::Null( - action_schema.field(ADD_NAME).unwrap().data_type().clone(), - )), - Expression::Literal(Scalar::Null( - action_schema - .field(REMOVE_NAME) - .unwrap() - .data_type() - .clone(), - )), - Expression::Literal(Scalar::Null( - action_schema - .field(METADATA_NAME) - .unwrap() - .data_type() - .clone(), - )), - Expression::Literal(Scalar::Null( - action_schema - .field(PROTOCOL_NAME) - .unwrap() - .data_type() - .clone(), - )), - Expression::Literal(Scalar::Null( - action_schema - .field(TRANSACTION_NAME) - .unwrap() - .data_type() - .clone(), - )), - Expression::Struct(commit_info_expr), - ]); - - // add the commit info fields to the action schema. - // e.g. if engine's commit info is {engineInfo: string, operation: string} - // then the 'commit_info' field in the actions will be: - // {kernelVersion: string, engineInfo: string, operation: string} - // let action_fields = action_schema - // .project_as_struct(&[ - // ADD_NAME, - // REMOVE_NAME, - // METADATA_NAME, - // PROTOCOL_NAME, - // TRANSACTION_NAME, - // ]) - // .unwrap() - // .fields(); - // let kernel_commit_info_fields = action_schema - // .project_as_struct(&[COMMIT_INFO_NAME]) - // .unwrap() - // .fields(); - // let engine_commit_info_fields = commit_info.schema.fields(); - // let commit_info_fields = kernel_commit_info_fields.chain(engine_commit_info_fields); - // let action_schema = StructType::new( - // std::iter::once(commit_info_fields) - // .chain(action_fields) - // .collect::Vec<_>() - // ); - - let mut action_fields = action_schema.fields().collect::>(); - let commit_info_field = action_fields.pop().unwrap(); - let mut commit_info_fields = - if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() { - commit_info_schema.fields().collect::>() - } else { - unreachable!() - }; - commit_info_fields.extend(commit_info.schema.fields()); - let commit_info_schema = - StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect()); - let mut action_fields = action_fields - .into_iter() - .map(|f| f.clone()) - .collect::>(); - action_fields.push(crate::schema::StructField::new( - COMMIT_INFO_NAME, - commit_info_schema, - true, - )); - let action_schema = StructType::new(action_fields); - - println!("commit_info schema: {:#?}", commit_info.schema); - println!("action_schema: {:#?}", action_schema); - - // commit info has arbitrary schema ex: {engineInfo: string, operation: string} - // we want to bundle it up and put it in the commit_info field of the actions. - let commit_info_evaluator = engine.get_expression_handler().get_evaluator( - commit_info.schema.into(), - commit_info_expr, - action_schema.into(), - ); - commit_info_evaluator - .evaluate(commit_info.data.as_ref()) - .unwrap() - }); + // + // TODO: enforce single row commit info + // TODO: for now we always require commit info + let (actions, _actions_schema) = generate_commit_info(engine, self.commit_info)?; // step two: figure out the commit version and path to write let commit_version = &self.read_snapshot.version() + 1; @@ -171,7 +62,7 @@ impl Transaction { /// first action in the commit. Note it is required in order to commit. If the engine does not /// require any commit info, pass an empty `EngineData`. pub fn commit_info(&mut self, commit_info: Box, schema: Schema) { - self.commit_info = Some(CommitInfoData { + self.commit_info = Some(EngineCommitInfo { data: commit_info, schema, }); @@ -181,3 +72,104 @@ impl Transaction { pub enum CommitResult { Committed(crate::Version), } + +fn generate_commit_info<'a>( + engine: &'a dyn Engine, + engine_commit_info: Option, +) -> DeltaResult<( + Box> + Send + 'a>, + SchemaRef, +)> { + use crate::actions::{ + ADD_NAME, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, TRANSACTION_NAME, + }; + + let action_schema = + Arc::new(engine_commit_info + .as_ref() + .map_or(get_log_schema().clone(), |commit_info| { + let mut action_fields = get_log_schema().fields().collect::>(); + let commit_info_field = action_fields.pop().unwrap(); + let mut commit_info_fields = + if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() { + commit_info_schema.fields().collect::>() + } else { + unreachable!() + }; + commit_info_fields.extend(commit_info.schema.fields()); + let commit_info_schema = + StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect()); + let mut action_fields = action_fields + .into_iter() + .map(|f| f.clone()) + .collect::>(); + action_fields.push(crate::schema::StructField::new( + COMMIT_INFO_NAME, + commit_info_schema, + true, + )); + StructType::new(action_fields) + })); + + let action_schema_ref = Arc::clone(&action_schema); + let actions = engine_commit_info.into_iter().map(move |commit_info| { + // TODO RENAME + let engine_commit_info_data = commit_info.data; + let engine_commit_info_schema = commit_info.schema; + // expression to select all the columns + let mut commit_info_expr = vec![Expression::literal("v0.3.1")]; + commit_info_expr.extend( + engine_commit_info_schema + .fields() + .map(|f| Expression::column(f.name())) + .collect::>(), + ); + let commit_info_expr = Expression::Struct(vec![ + Expression::Literal(Scalar::Null( + action_schema_ref.field(ADD_NAME).unwrap().data_type().clone(), + )), + Expression::Literal(Scalar::Null( + action_schema_ref + .field(REMOVE_NAME) + .unwrap() + .data_type() + .clone(), + )), + Expression::Literal(Scalar::Null( + action_schema_ref + .field(METADATA_NAME) + .unwrap() + .data_type() + .clone(), + )), + Expression::Literal(Scalar::Null( + action_schema_ref + .field(PROTOCOL_NAME) + .unwrap() + .data_type() + .clone(), + )), + Expression::Literal(Scalar::Null( + action_schema_ref + .field(TRANSACTION_NAME) + .unwrap() + .data_type() + .clone(), + )), + Expression::Struct(commit_info_expr), + ]); + + // commit info has arbitrary schema ex: {engineInfo: string, operation: string} + // we want to bundle it up and put it in the commit_info field of the actions. + let commit_info_evaluator = engine.get_expression_handler().get_evaluator( + engine_commit_info_schema.into(), + commit_info_expr, + action_schema_ref.clone().into(), + ); + + commit_info_evaluator + .evaluate(engine_commit_info_data.as_ref()) + .unwrap() + }); + Ok((Box::new(actions), action_schema)) +}