Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up inspect-table and add Cdc action support #527

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 78 additions & 103 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use delta_kernel::actions::get_log_schema;
use delta_kernel::actions::visitors::{
AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor,
AddVisitor, CdcVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor,
};
use delta_kernel::actions::{
get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
SET_TRANSACTION_NAME,
};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine_data::{GetData, TypedGetData};
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{DataType, SchemaRef, StructField};
use delta_kernel::{DataVisitor, DeltaResult, Table};
use delta_kernel::{DataVisitor, DeltaResult, Error, Table};

use std::collections::HashMap;
use std::process::ExitCode;
Expand Down Expand Up @@ -40,9 +43,9 @@ enum Commands {
ScanData,
/// Show each action from the log-segments
Actions {
/// Show the log in forward order (default is to show it going backwards in time)
/// Show the log in reverse order (default is log replay order -- newest first)
#[arg(short, long)]
forward: bool,
oldest_first: bool,
},
}

Expand All @@ -58,23 +61,12 @@ fn main() -> ExitCode {
}

enum Action {
Metadata(delta_kernel::actions::Metadata, usize),
Protocol(delta_kernel::actions::Protocol, usize),
Remove(delta_kernel::actions::Remove, usize),
Add(delta_kernel::actions::Add, usize),
SetTransaction(delta_kernel::actions::SetTransaction, usize),
}

impl Action {
fn row(&self) -> usize {
match self {
Action::Metadata(_, row) => *row,
Action::Protocol(_, row) => *row,
Action::Remove(_, row) => *row,
Action::Add(_, row) => *row,
Action::SetTransaction(_, row) => *row,
}
}
Metadata(delta_kernel::actions::Metadata),
Protocol(delta_kernel::actions::Protocol),
Remove(delta_kernel::actions::Remove),
Add(delta_kernel::actions::Add),
SetTransaction(delta_kernel::actions::SetTransaction),
Cdc(delta_kernel::actions::Cdc),
}

fn fields_in(field: &StructField) -> usize {
Expand All @@ -86,89 +78,77 @@ fn fields_in(field: &StructField) -> usize {
}

struct LogVisitor {
actions: Vec<Action>,
add_offset: usize,
remove_offset: usize,
protocol_offset: usize,
metadata_offset: usize,
set_transaction_offset: usize,
actions: Vec<(Action, usize)>,
offsets: HashMap<String, (usize, usize)>,
previous_rows_seen: usize,
}

impl LogVisitor {
fn new(log_schema: &SchemaRef) -> LogVisitor {
let mut offset = 0;
let mut add_offset = 0;
let mut remove_offset = 0;
let mut protocol_offset = 0;
let mut metadata_offset = 0;
let mut set_transaction_offset = 0;
// Grab the start offset for each top-level column name, then compute the end offset by
// skipping the rest of the leaves for that column.
let mut start = 0;
let mut offsets = HashMap::new();
for field in log_schema.fields() {
match field.name().as_str() {
"add" => add_offset = offset,
"remove" => remove_offset = offset,
"protocol" => protocol_offset = offset,
"metaData" => metadata_offset = offset,
"txn" => set_transaction_offset = offset,
_ => {}
}
offset += fields_in(field);
let end = start + fields_in(field);
offsets.insert(field.name.clone(), (start, end));
start = end;
}
LogVisitor {
actions: vec![],
add_offset,
remove_offset,
protocol_offset,
metadata_offset,
set_transaction_offset,
offsets,
previous_rows_seen: 0,
}
}
}

impl DataVisitor for LogVisitor {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
if getters.len() != 55 {
return Err(Error::InternalError(format!(
"Wrong number of LogVisitor getters: {}",
getters.len()
)));
}
let (add_start, add_end) = self.offsets[ADD_NAME];
let (remove_start, remove_end) = self.offsets[REMOVE_NAME];
let (metadata_start, metadata_end) = self.offsets[METADATA_NAME];
let (protocol_start, protocol_end) = self.offsets[PROTOCOL_NAME];
let (txn_start, txn_end) = self.offsets[SET_TRANSACTION_NAME];
let (cdc_start, cdc_end) = self.offsets[CDC_NAME];
for i in 0..row_count {
if let Some(path) = getters[self.add_offset].get_opt(i, "add.path")? {
self.actions.push(Action::Add(
AddVisitor::visit_add(i, path, &getters[self.add_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(path) = getters[self.remove_offset].get_opt(i, "remove.path")? {
self.actions.push(Action::Remove(
RemoveVisitor::visit_remove(i, path, &getters[self.remove_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(id) = getters[self.metadata_offset].get_opt(i, "metadata.id")? {
self.actions.push(Action::Metadata(
MetadataVisitor::visit_metadata(i, id, &getters[self.metadata_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(min_reader_version) =
getters[self.protocol_offset].get_opt(i, "protocol.min_reader_version")?
let action = if let Some(path) = getters[add_start].get_opt(i, "add.path")? {
let add = AddVisitor::visit_add(i, path, &getters[add_start..add_end])?;
Action::Add(add)
} else if let Some(path) = getters[remove_start].get_opt(i, "remove.path")? {
let remove =
RemoveVisitor::visit_remove(i, path, &getters[remove_start..remove_end])?;
Action::Remove(remove)
} else if let Some(id) = getters[metadata_start].get_opt(i, "metadata.id")? {
let metadata =
MetadataVisitor::visit_metadata(i, id, &getters[metadata_start..metadata_end])?;
Action::Metadata(metadata)
} else if let Some(min_reader_version) =
getters[protocol_start].get_opt(i, "protocol.min_reader_version")?
{
self.actions.push(Action::Protocol(
ProtocolVisitor::visit_protocol(
i,
min_reader_version,
&getters[self.protocol_offset..],
)?,
self.previous_rows_seen + i,
));
}
if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? {
self.actions.push(Action::SetTransaction(
SetTransactionVisitor::visit_txn(
i,
app_id,
&getters[self.set_transaction_offset..],
)?,
self.previous_rows_seen + i,
));
}
let protocol = ProtocolVisitor::visit_protocol(
i,
min_reader_version,
&getters[protocol_start..protocol_end],
)?;
Action::Protocol(protocol)
} else if let Some(app_id) = getters[txn_start].get_opt(i, "txn.appId")? {
let txn =
SetTransactionVisitor::visit_txn(i, app_id, &getters[txn_start..txn_end])?;
Action::SetTransaction(txn)
} else if let Some(path) = getters[cdc_start].get_opt(i, "cdc.path")? {
let cdc = CdcVisitor::visit_cdc(i, path, &getters[cdc_start..cdc_end])?;
Action::Cdc(cdc)
} else {
// TODO: Add CommitInfo support (tricky because all fields are optional)
continue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made #528 for commit info support

};
self.actions.push((action, self.previous_rows_seen + i));
}
self.previous_rows_seen += row_count;
Ok(())
Expand Down Expand Up @@ -214,7 +194,7 @@ fn try_main() -> DeltaResult<()> {

let snapshot = table.snapshot(&engine, None)?;

match &cli.command {
match cli.command {
Commands::TableVersion => {
println!("Latest table version: {}", snapshot.version());
}
Expand All @@ -237,7 +217,7 @@ fn try_main() -> DeltaResult<()> {
)?;
}
}
Commands::Actions { forward } => {
Commands::Actions { oldest_first } => {
let log_schema = get_log_schema();
let actions = snapshot._log_segment().replay(
&engine,
Expand All @@ -251,22 +231,17 @@ fn try_main() -> DeltaResult<()> {
action?.0.extract(log_schema.clone(), &mut visitor)?;
}

if *forward {
visitor
.actions
.sort_by(|a, b| a.row().partial_cmp(&b.row()).unwrap());
} else {
visitor
.actions
.sort_by(|a, b| b.row().partial_cmp(&a.row()).unwrap());
if oldest_first {
visitor.actions.reverse();
}
for action in visitor.actions.iter() {
for (action, row) in visitor.actions.iter() {
match action {
Action::Metadata(md, row) => println!("\nAction {row}:\n{:#?}", md),
Action::Protocol(p, row) => println!("\nAction {row}:\n{:#?}", p),
Action::Remove(r, row) => println!("\nAction {row}:\n{:#?}", r),
Action::Add(a, row) => println!("\nAction {row}:\n{:#?}", a),
Action::SetTransaction(t, row) => println!("\nAction {row}:\n{:#?}", t),
Action::Metadata(md) => println!("\nAction {row}:\n{:#?}", md),
Action::Protocol(p) => println!("\nAction {row}:\n{:#?}", p),
Action::Remove(r) => println!("\nAction {row}:\n{:#?}", r),
Action::Add(a) => println!("\nAction {row}:\n{:#?}", a),
Action::SetTransaction(t) => println!("\nAction {row}:\n{:#?}", t),
Action::Cdc(c) => println!("\nAction {row}:\n{:#?}", c),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ pub mod visitors;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod visitors;

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const ADD_NAME: &str = "add";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const REMOVE_NAME: &str = "remove";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const METADATA_NAME: &str = "metaData";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const PROTOCOL_NAME: &str = "protocol";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
Expand Down
Loading