Skip to content

Commit

Permalink
Define and use a get_log_add_schema() function (#407)
Browse files Browse the repository at this point in the history
We have a convenient `get_log_schema()` function for accessing the Delta
log schema, but many operations only need to access the `add` column.
Replace clunky projection code with something direct and infallible,
wrapped up as `get_log_add_schema()`.

While we're at it, make both functions return `SchemaRef` instead of
`Schema`, to reduce the number of deep clones needed (many use sites
anyway need an arc).
  • Loading branch information
scovich authored Oct 23, 2024
1 parent 8de8689 commit 7385d19
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 23 deletions.
4 changes: 2 additions & 2 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ fn try_main() -> DeltaResult<()> {
}
}
Commands::Actions { forward } => {
let log_schema = Arc::new(get_log_schema().clone());
let log_schema = get_log_schema();
let actions = snapshot._log_segment().replay(
&engine,
log_schema.clone(),
log_schema.clone(),
None,
)?;

let mut visitor = LogVisitor::new(&log_schema);
let mut visitor = LogVisitor::new(log_schema);
for action in actions {
action?.0.extract(log_schema.clone(), &mut visitor)?;
}
Expand Down
18 changes: 14 additions & 4 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor};
use self::deletion_vector::DeletionVectorDescriptor;
use crate::actions::schemas::GetStructField;
use crate::features::{ReaderFeatures, WriterFeatures};
use crate::{schema::StructType, DeltaResult, EngineData};
use crate::schema::{SchemaRef, StructType};
use crate::{DeltaResult, EngineData};

pub mod deletion_vector;
pub mod set_transaction;
Expand All @@ -28,7 +29,9 @@ pub(crate) const PROTOCOL_NAME: &str = "protocol";
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";

static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());
static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new([
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Expand All @@ -40,14 +43,21 @@ static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
//Option::<Cdc>::get_struct_field(CDC_NAME),
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
.into()
});

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn get_log_schema() -> &'static StructType {
fn get_log_schema() -> &'static SchemaRef {
&LOG_SCHEMA
}

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct Format {
/// Name of the encoding for files in this table
Expand Down Expand Up @@ -194,7 +204,7 @@ impl Add {
/// Since we always want to parse multiple adds from data, we return a `Vec<Add>`
pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult<Vec<Add>> {
let mut visitor = AddVisitor::default();
data.extract(get_log_schema().project(&[ADD_NAME])?, &mut visitor)?;
data.extract(get_log_add_schema().clone(), &mut visitor)?;
Ok(visitor.adds)
}

Expand Down
12 changes: 5 additions & 7 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ mod tests {

use super::*;
use crate::{
actions::{get_log_schema, ADD_NAME, SET_TRANSACTION_NAME},
actions::{get_log_add_schema, get_log_schema, SET_TRANSACTION_NAME},
engine::arrow_data::ArrowEngineData,
engine::sync::{json::SyncJsonHandler, SyncEngine},
Engine, EngineData, JsonHandler,
Expand All @@ -370,7 +370,7 @@ mod tests {
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand Down Expand Up @@ -433,13 +433,11 @@ mod tests {
r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let add_schema = get_log_schema()
.project(&[ADD_NAME])
.expect("Can't get add schema");
let add_schema = get_log_add_schema().clone();
let mut add_visitor = AddVisitor::default();
batch.extract(add_schema, &mut add_visitor).unwrap();
let add1 = Add {
Expand Down Expand Up @@ -497,7 +495,7 @@ mod tests {
r#"{"txn":{"appId":"myApp2","version": 4, "lastUpdated": 1670892998177}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ mod tests {
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ mod tests {
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]);
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();

let batch = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
Expand All @@ -219,7 +219,7 @@ mod tests {
let json_strings = StringArray::from(vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2, "maxRowId": 3}}}"#,
]);
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();

let batch: RecordBatch = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
Expand Down Expand Up @@ -257,7 +257,7 @@ mod tests {
}];

let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema()).unwrap());
let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema().as_ref()).unwrap());
let data: Vec<RecordBatch> = handler
.read_json_files(files, Arc::new(physical_schema.try_into().unwrap()), None)
.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::{Arc, LazyLock};

use tracing::debug;

use crate::actions::get_log_add_schema;
use crate::actions::visitors::SelectionVectorVisitor;
use crate::actions::{get_log_schema, ADD_NAME};
use crate::error::DeltaResult;
use crate::expressions::{
BinaryOperator, Expression as Expr, ExpressionRef, UnaryOperator, VariadicOperator,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl DataSkippingFilter {
// the predicate is true/null and false (= skip) when the predicate is false.
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_schema().project(&[ADD_NAME]).unwrap(),
get_log_add_schema().clone(),
STATS_EXPR.clone(),
DataType::STRING,
);
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::debug;
use url::Url;

use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor};
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::expressions::{Expression, ExpressionRef, Scalar};
use crate::features::ColumnMappingMode;
use crate::scan::state::{DvInfo, Stats};
Expand Down Expand Up @@ -230,7 +230,7 @@ impl Scan {
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;
let checkpoint_read_schema = get_log_add_schema().clone();

// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
Expand Down Expand Up @@ -548,7 +548,7 @@ pub(crate) mod test_utils {
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand All @@ -565,7 +565,7 @@ pub(crate) mod test_utils {
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand Down

0 comments on commit 7385d19

Please sign in to comment.