Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up inspect-table and add Cdc action support #527

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 90 additions & 104 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use delta_kernel::actions::get_log_schema;
use delta_kernel::actions::visitors::{
AddVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor,
AddVisitor, CdcVisitor, MetadataVisitor, ProtocolVisitor, RemoveVisitor, SetTransactionVisitor,
};
use delta_kernel::actions::{
get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
SET_TRANSACTION_NAME,
};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine_data::{GetData, TypedGetData};
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{DataType, SchemaRef, StructField};
use delta_kernel::{DataVisitor, DeltaResult, Table};
use delta_kernel::{DataVisitor, DeltaResult, Error, Table};

use std::collections::HashMap;
use std::process::ExitCode;
Expand Down Expand Up @@ -40,9 +43,9 @@ enum Commands {
ScanData,
/// Show each action from the log-segments
Actions {
/// Show the log in forward order (default is to show it going backwards in time)
/// Show the log in reverse order (default is log replay order -- newest first)
#[arg(short, long)]
forward: bool,
oldest_first: bool,
},
}

Expand All @@ -58,23 +61,12 @@ fn main() -> ExitCode {
}

enum Action {
Metadata(delta_kernel::actions::Metadata, usize),
Protocol(delta_kernel::actions::Protocol, usize),
Remove(delta_kernel::actions::Remove, usize),
Add(delta_kernel::actions::Add, usize),
SetTransaction(delta_kernel::actions::SetTransaction, usize),
}

impl Action {
fn row(&self) -> usize {
match self {
Action::Metadata(_, row) => *row,
Action::Protocol(_, row) => *row,
Action::Remove(_, row) => *row,
Action::Add(_, row) => *row,
Action::SetTransaction(_, row) => *row,
}
}
Metadata(delta_kernel::actions::Metadata),
Protocol(delta_kernel::actions::Protocol),
Remove(delta_kernel::actions::Remove),
Add(delta_kernel::actions::Add),
SetTransaction(delta_kernel::actions::SetTransaction),
Cdc(delta_kernel::actions::Cdc),
}

fn fields_in(field: &StructField) -> usize {
Expand All @@ -86,89 +78,88 @@ fn fields_in(field: &StructField) -> usize {
}

struct LogVisitor {
actions: Vec<Action>,
add_offset: usize,
remove_offset: usize,
protocol_offset: usize,
metadata_offset: usize,
set_transaction_offset: usize,
actions: Vec<(Action, usize)>,
offsets: HashMap<&'static str, (usize, usize)>,
previous_rows_seen: usize,
}

impl LogVisitor {
fn new(log_schema: &SchemaRef) -> LogVisitor {
let mut offset = 0;
let mut add_offset = 0;
let mut remove_offset = 0;
let mut protocol_offset = 0;
let mut metadata_offset = 0;
let mut set_transaction_offset = 0;
fn new(log_schema: &'static SchemaRef) -> LogVisitor {
// Grab the start offset for each top-level column name, then compute the end offset by
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for SchemaRef to be static?

Copy link
Collaborator Author

@scovich scovich Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because rust is a bit silly, and you can't probe a HashMap<String, _> with &str (there's a Borrow<str> for String but no Borrow<&str> for String). So I'm storing HashMap<&'static str, _> instead, to avoid allocating a string for every map probe and/or adding a lifetime to the enclosing struct. I hit a similar issue with ColumnName btw, but could solve that one by defining a suitable Borrow (can do that since it's a local trait).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a Borrow for String but no Borrow<&str> for String

wow you're right that is silly. TIL the difference between str and &str 👍

to avoid allocating a string for every map probe and/or adding a lifetime to the enclosing struct

that makes sense.

can do that since it's a local trait

And we can't do this because Borrow is a foreign trait, and SchemaRef is a foreign struct (from inspect-table's perspective). While in the example, ColumnName was at least local.

Thanks @scovich!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Turns out if I pass e.g. offsets[ADD_NAME] instead of offsets[&ADD_NAME] then it works!

// skipping the rest of the leaves for that column.
let mut start = 0;
let mut offsets = HashMap::new();
for field in log_schema.fields() {
match field.name().as_str() {
"add" => add_offset = offset,
"remove" => remove_offset = offset,
"protocol" => protocol_offset = offset,
"metaData" => metadata_offset = offset,
"txn" => set_transaction_offset = offset,
_ => {}
}
offset += fields_in(field);
let end = start + fields_in(field);
offsets.insert(field.name.as_str(), (start, end));
start = end;
}
LogVisitor {
actions: vec![],
add_offset,
remove_offset,
protocol_offset,
metadata_offset,
set_transaction_offset,
offsets,
previous_rows_seen: 0,
}
}
}

impl DataVisitor for LogVisitor {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
if getters.len() != 55 {
return Err(Error::InternalError(format!(
"Wrong number of LogVisitor getters: {}",
getters.len()
)));
}
let (add_start, add_end) = self.offsets[&ADD_NAME];
let (remove_start, remove_end) = self.offsets[&REMOVE_NAME];
let (metadata_start, metadata_end) = self.offsets[&METADATA_NAME];
let (protocol_start, protocol_end) = self.offsets[&PROTOCOL_NAME];
let (set_transaction_start, set_transaction_end) = self.offsets[&SET_TRANSACTION_NAME];
let (cdc_start, cdc_end) = self.offsets[&CDC_NAME];
for i in 0..row_count {
if let Some(path) = getters[self.add_offset].get_opt(i, "add.path")? {
self.actions.push(Action::Add(
AddVisitor::visit_add(i, path, &getters[self.add_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(path) = getters[self.remove_offset].get_opt(i, "remove.path")? {
self.actions.push(Action::Remove(
RemoveVisitor::visit_remove(i, path, &getters[self.remove_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(id) = getters[self.metadata_offset].get_opt(i, "metadata.id")? {
self.actions.push(Action::Metadata(
MetadataVisitor::visit_metadata(i, id, &getters[self.metadata_offset..])?,
self.previous_rows_seen + i,
));
}
if let Some(min_reader_version) =
getters[self.protocol_offset].get_opt(i, "protocol.min_reader_version")?
let action = if let Some(path) = getters[add_start].get_opt(i, "add.path")? {
Action::Add(AddVisitor::visit_add(
i,
path,
&getters[add_start..add_end],
)?)
} else if let Some(path) = getters[remove_start].get_opt(i, "remove.path")? {
Action::Remove(RemoveVisitor::visit_remove(
i,
path,
&getters[remove_start..remove_end],
)?)
} else if let Some(id) = getters[metadata_start].get_opt(i, "metadata.id")? {
Action::Metadata(MetadataVisitor::visit_metadata(
i,
id,
&getters[metadata_start..metadata_end],
)?)
} else if let Some(min_reader_version) =
getters[protocol_start].get_opt(i, "protocol.min_reader_version")?
{
self.actions.push(Action::Protocol(
ProtocolVisitor::visit_protocol(
i,
min_reader_version,
&getters[self.protocol_offset..],
)?,
self.previous_rows_seen + i,
));
}
if let Some(app_id) = getters[self.set_transaction_offset].get_opt(i, "txn.appId")? {
self.actions.push(Action::SetTransaction(
SetTransactionVisitor::visit_txn(
i,
app_id,
&getters[self.set_transaction_offset..],
)?,
self.previous_rows_seen + i,
));
}
Action::Protocol(ProtocolVisitor::visit_protocol(
i,
min_reader_version,
&getters[protocol_start..protocol_end],
)?)
} else if let Some(app_id) = getters[set_transaction_start].get_opt(i, "txn.appId")? {
Action::SetTransaction(SetTransactionVisitor::visit_txn(
i,
app_id,
&getters[set_transaction_start..set_transaction_end],
)?)
} else if let Some(path) = getters[cdc_start].get_opt(i, "cdc.path")? {
Action::Cdc(CdcVisitor::visit_cdc(
i,
path,
&getters[cdc_start..cdc_end],
)?)
} else {
// TODO: Add CommitInfo support (tricky because all fields are optional)
continue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made #528 for commit info support

};
self.actions.push((action, self.previous_rows_seen + i));
}
self.previous_rows_seen += row_count;
Ok(())
Expand Down Expand Up @@ -214,7 +205,7 @@ fn try_main() -> DeltaResult<()> {

let snapshot = table.snapshot(&engine, None)?;

match &cli.command {
match cli.command {
Commands::TableVersion => {
println!("Latest table version: {}", snapshot.version());
}
Expand All @@ -237,7 +228,7 @@ fn try_main() -> DeltaResult<()> {
)?;
}
}
Commands::Actions { forward } => {
Commands::Actions { oldest_first } => {
let log_schema = get_log_schema();
let actions = snapshot._log_segment().replay(
&engine,
Expand All @@ -251,22 +242,17 @@ fn try_main() -> DeltaResult<()> {
action?.0.extract(log_schema.clone(), &mut visitor)?;
}

if *forward {
visitor
.actions
.sort_by(|a, b| a.row().partial_cmp(&b.row()).unwrap());
} else {
visitor
.actions
.sort_by(|a, b| b.row().partial_cmp(&a.row()).unwrap());
if oldest_first {
visitor.actions.reverse();
}
for action in visitor.actions.iter() {
for (action, row) in visitor.actions.iter() {
match action {
Action::Metadata(md, row) => println!("\nAction {row}:\n{:#?}", md),
Action::Protocol(p, row) => println!("\nAction {row}:\n{:#?}", p),
Action::Remove(r, row) => println!("\nAction {row}:\n{:#?}", r),
Action::Add(a, row) => println!("\nAction {row}:\n{:#?}", a),
Action::SetTransaction(t, row) => println!("\nAction {row}:\n{:#?}", t),
Action::Metadata(md) => println!("\nAction {row}:\n{:#?}", md),
Action::Protocol(p) => println!("\nAction {row}:\n{:#?}", p),
Action::Remove(r) => println!("\nAction {row}:\n{:#?}", r),
Action::Add(a) => println!("\nAction {row}:\n{:#?}", a),
Action::SetTransaction(t) => println!("\nAction {row}:\n{:#?}", t),
Action::Cdc(c) => println!("\nAction {row}:\n{:#?}", c),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ pub mod visitors;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod visitors;

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const ADD_NAME: &str = "add";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const REMOVE_NAME: &str = "remove";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const METADATA_NAME: &str = "metaData";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const PROTOCOL_NAME: &str = "protocol";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
Expand Down
Loading