From c0b9a557a34ba10f3357d8579e86dd48cb8d1aee Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 1 Oct 2024 22:47:02 -0700 Subject: [PATCH] fix commit info --- kernel/src/actions/mod.rs | 2 +- kernel/src/transaction.rs | 67 +++++++++++++++++++++------------------ 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 80cd3e7ff..1a75d0260 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -29,12 +29,12 @@ pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo"; lazy_static! { static ref LOG_SCHEMA: StructType = StructType::new( vec![ - Option::::get_struct_field(COMMIT_INFO_NAME), // FIXME REORDER Option::::get_struct_field(ADD_NAME), Option::::get_struct_field(REMOVE_NAME), Option::::get_struct_field(METADATA_NAME), Option::::get_struct_field(PROTOCOL_NAME), Option::::get_struct_field(TRANSACTION_NAME), + Option::::get_struct_field(COMMIT_INFO_NAME), // We don't support the following actions yet //Option::::get_struct_field(CDC_NAME), //Option::::get_struct_field(DOMAIN_METADATA_NAME), diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index f5cac58db..4e259540e 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use crate::actions::get_log_schema; use crate::expressions::Scalar; @@ -42,36 +42,41 @@ impl Transaction { let actions = self.commit_info.into_iter().map(|commit_info| { // expression to select all the columns let commit_info_expr = Expression::Struct(vec![ - //Expression::Literal(Scalar::Null( - // action_schema - // .project(&[crate::actions::ADD_NAME]) - // .unwrap() - // .into(), - //)), - //Expression::Literal(Scalar::Null( - // action_schema - // .project(&[crate::actions::REMOVE_NAME]) - // .unwrap() - // .into(), - //)), - //Expression::Literal(Scalar::Null( - // action_schema - // .project(&[crate::actions::METADATA_NAME]) - // .unwrap() - // .into(), - //)), - //Expression::Literal(Scalar::Null( - // action_schema - // .project(&[crate::actions::PROTOCOL_NAME]) - // .unwrap() - // .into(), - //)), - //Expression::Literal(Scalar::Null( - // action_schema - // .project(&[crate::actions::TRANSACTION_NAME]) - // .unwrap() - // .into(), - //)), + Expression::Literal(Scalar::Null( + action_schema + .field(crate::actions::ADD_NAME) + .unwrap() + .data_type() + .clone(), + )), + Expression::Literal(Scalar::Null( + action_schema + .field(crate::actions::REMOVE_NAME) + .unwrap() + .data_type() + .clone(), + )), + Expression::Literal(Scalar::Null( + action_schema + .field(crate::actions::METADATA_NAME) + .unwrap() + .data_type() + .clone(), + )), + Expression::Literal(Scalar::Null( + action_schema + .field(crate::actions::PROTOCOL_NAME) + .unwrap() + .data_type() + .clone(), + )), + Expression::Literal(Scalar::Null( + action_schema + .field(crate::actions::TRANSACTION_NAME) + .unwrap() + .data_type() + .clone(), + )), Expression::Struct( commit_info .schema