diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 35ce34fa9..1bccc5986 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -1,6 +1,9 @@ -use delta_kernel::actions::get_log_schema; use delta_kernel::actions::visitors::{ - AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor, + AddVisitor, CdcVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor, +}; +use delta_kernel::actions::{ + get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, + SET_TRANSACTION_NAME, }; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; @@ -8,7 +11,7 @@ use delta_kernel::engine_data::{GetData, TypedGetData}; use delta_kernel::scan::state::{DvInfo, Stats}; use delta_kernel::scan::ScanBuilder; use delta_kernel::schema::{DataType, SchemaRef, StructField}; -use delta_kernel::{DataVisitor, DeltaResult, Table}; +use delta_kernel::{DataVisitor, DeltaResult, Error, Table}; use std::collections::HashMap; use std::process::ExitCode; @@ -40,9 +43,9 @@ enum Commands { ScanData, /// Show each action from the log-segments Actions { - /// Show the log in forward order (default is to show it going backwards in time) + /// Show the log in reverse order (default is log replay order -- newest first) #[arg(short, long)] - forward: bool, + oldest_first: bool, }, } @@ -58,23 +61,12 @@ fn main() -> ExitCode { } enum Action { - Metadata(delta_kernel::actions::Metadata, usize), - Protocol(delta_kernel::actions::Protocol, usize), - Remove(delta_kernel::actions::Remove, usize), - Add(delta_kernel::actions::Add, usize), - SetTransaction(delta_kernel::actions::SetTransaction, usize), -} - -impl Action { - fn row(&self) -> usize { - match self { - Action::Metadata(_, row) => *row, - Action::Protocol(_, row) => *row, - Action::Remove(_, row) => *row, - Action::Add(_, row) => *row, - Action::SetTransaction(_, row) => *row, - } - } + Metadata(delta_kernel::actions::Metadata), + Protocol(delta_kernel::actions::Protocol), + Remove(delta_kernel::actions::Remove), + Add(delta_kernel::actions::Add), + SetTransaction(delta_kernel::actions::SetTransaction), + Cdc(delta_kernel::actions::Cdc), } fn fields_in(field: &StructField) -> usize { @@ -86,41 +78,25 @@ fn fields_in(field: &StructField) -> usize { } struct LogVisitor { - actions: Vec, - add_offset: usize, - remove_offset: usize, - protocol_offset: usize, - metadata_offset: usize, - set_transaction_offset: usize, + actions: Vec<(Action, usize)>, + offsets: HashMap, previous_rows_seen: usize, } impl LogVisitor { fn new(log_schema: &SchemaRef) -> LogVisitor { - let mut offset = 0; - let mut add_offset = 0; - let mut remove_offset = 0; - let mut protocol_offset = 0; - let mut metadata_offset = 0; - let mut set_transaction_offset = 0; + // Grab the start offset for each top-level column name, then compute the end offset by + // skipping the rest of the leaves for that column. + let mut start = 0; + let mut offsets = HashMap::new(); for field in log_schema.fields() { - match field.name().as_str() { - "add" => add_offset = offset, - "remove" => remove_offset = offset, - "protocol" => protocol_offset = offset, - "metaData" => metadata_offset = offset, - "txn" => set_transaction_offset = offset, - _ => {} - } - offset += fields_in(field); + let end = start + fields_in(field); + offsets.insert(field.name.clone(), (start, end)); + start = end; } LogVisitor { actions: vec![], - add_offset, - remove_offset, - protocol_offset, - metadata_offset, - set_transaction_offset, + offsets, previous_rows_seen: 0, } } @@ -128,47 +104,51 @@ impl LogVisitor { impl DataVisitor for LogVisitor { fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { + if getters.len() != 55 { + return Err(Error::InternalError(format!( + "Wrong number of LogVisitor getters: {}", + getters.len() + ))); + } + let (add_start, add_end) = self.offsets[ADD_NAME]; + let (remove_start, remove_end) = self.offsets[REMOVE_NAME]; + let (metadata_start, metadata_end) = self.offsets[METADATA_NAME]; + let (protocol_start, protocol_end) = self.offsets[PROTOCOL_NAME]; + let (txn_start, txn_end) = self.offsets[SET_TRANSACTION_NAME]; + let (cdc_start, cdc_end) = self.offsets[CDC_NAME]; for i in 0..row_count { - if let Some(path) = getters[self.add_offset].get_opt(i, "add.path")? { - self.actions.push(Action::Add( - AddVisitor::visit_add(i, path, &getters[self.add_offset..])?, - self.previous_rows_seen + i, - )); - } - if let Some(path) = getters[self.remove_offset].get_opt(i, "remove.path")? { - self.actions.push(Action::Remove( - RemoveVisitor::visit_remove(i, path, &getters[self.remove_offset..])?, - self.previous_rows_seen + i, - )); - } - if let Some(id) = getters[self.metadata_offset].get_opt(i, "metadata.id")? { - self.actions.push(Action::Metadata( - MetadataVisitor::visit_metadata(i, id, &getters[self.metadata_offset..])?, - self.previous_rows_seen + i, - )); - } - if let Some(min_reader_version) = - getters[self.protocol_offset].get_opt(i, "protocol.min_reader_version")? + let action = if let Some(path) = getters[add_start].get_opt(i, "add.path")? { + let add = AddVisitor::visit_add(i, path, &getters[add_start..add_end])?; + Action::Add(add) + } else if let Some(path) = getters[remove_start].get_opt(i, "remove.path")? { + let remove = + RemoveVisitor::visit_remove(i, path, &getters[remove_start..remove_end])?; + Action::Remove(remove) + } else if let Some(id) = getters[metadata_start].get_opt(i, "metadata.id")? { + let metadata = + MetadataVisitor::visit_metadata(i, id, &getters[metadata_start..metadata_end])?; + Action::Metadata(metadata) + } else if let Some(min_reader_version) = + getters[protocol_start].get_opt(i, "protocol.min_reader_version")? { - self.actions.push(Action::Protocol( - ProtocolVisitor::visit_protocol( - i, - min_reader_version, - &getters[self.protocol_offset..], - )?, - self.previous_rows_seen + i, - )); - } - if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? { - self.actions.push(Action::SetTransaction( - SetTransactionVisitor::visit_txn( - i, - app_id, - &getters[self.set_transaction_offset..], - )?, - self.previous_rows_seen + i, - )); - } + let protocol = ProtocolVisitor::visit_protocol( + i, + min_reader_version, + &getters[protocol_start..protocol_end], + )?; + Action::Protocol(protocol) + } else if let Some(app_id) = getters[txn_start].get_opt(i, "txn.appId")? { + let txn = + SetTransactionVisitor::visit_txn(i, app_id, &getters[txn_start..txn_end])?; + Action::SetTransaction(txn) + } else if let Some(path) = getters[cdc_start].get_opt(i, "cdc.path")? { + let cdc = CdcVisitor::visit_cdc(i, path, &getters[cdc_start..cdc_end])?; + Action::Cdc(cdc) + } else { + // TODO: Add CommitInfo support (tricky because all fields are optional) + continue; + }; + self.actions.push((action, self.previous_rows_seen + i)); } self.previous_rows_seen += row_count; Ok(()) @@ -214,7 +194,7 @@ fn try_main() -> DeltaResult<()> { let snapshot = table.snapshot(&engine, None)?; - match &cli.command { + match cli.command { Commands::TableVersion => { println!("Latest table version: {}", snapshot.version()); } @@ -237,7 +217,7 @@ fn try_main() -> DeltaResult<()> { )?; } } - Commands::Actions { forward } => { + Commands::Actions { oldest_first } => { let log_schema = get_log_schema(); let actions = snapshot._log_segment().replay( &engine, @@ -251,22 +231,17 @@ fn try_main() -> DeltaResult<()> { action?.0.extract(log_schema.clone(), &mut visitor)?; } - if *forward { - visitor - .actions - .sort_by(|a, b| a.row().partial_cmp(&b.row()).unwrap()); - } else { - visitor - .actions - .sort_by(|a, b| b.row().partial_cmp(&a.row()).unwrap()); + if oldest_first { + visitor.actions.reverse(); } - for action in visitor.actions.iter() { + for (action, row) in visitor.actions.iter() { match action { - Action::Metadata(md, row) => println!("\nAction {row}:\n{:#?}", md), - Action::Protocol(p, row) => println!("\nAction {row}:\n{:#?}", p), - Action::Remove(r, row) => println!("\nAction {row}:\n{:#?}", r), - Action::Add(a, row) => println!("\nAction {row}:\n{:#?}", a), - Action::SetTransaction(t, row) => println!("\nAction {row}:\n{:#?}", t), + Action::Metadata(md) => println!("\nAction {row}:\n{:#?}", md), + Action::Protocol(p) => println!("\nAction {row}:\n{:#?}", p), + Action::Remove(r) => println!("\nAction {row}:\n{:#?}", r), + Action::Add(a) => println!("\nAction {row}:\n{:#?}", a), + Action::SetTransaction(t) => println!("\nAction {row}:\n{:#?}", t), + Action::Cdc(c) => println!("\nAction {row}:\n{:#?}", c), } } } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index e8419a942..6daaa4cd4 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -30,12 +30,19 @@ pub mod visitors; #[cfg(not(feature = "developer-visibility"))] pub(crate) mod visitors; +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) const ADD_NAME: &str = "add"; +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) const REMOVE_NAME: &str = "remove"; +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) const METADATA_NAME: &str = "metaData"; +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) const PROTOCOL_NAME: &str = "protocol"; +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) const SET_TRANSACTION_NAME: &str = "txn"; +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo"; +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) const CDC_NAME: &str = "cdc"; static LOG_ADD_SCHEMA: LazyLock =