Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define and use a get_log_add_schema() function #407

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ fn try_main() -> DeltaResult<()> {
}
}
Commands::Actions { forward } => {
let log_schema = Arc::new(get_log_schema().clone());
let log_schema = get_log_schema();
let actions = snapshot._log_segment().replay(
&engine,
log_schema.clone(),
log_schema.clone(),
None,
)?;

let mut visitor = LogVisitor::new(&log_schema);
let mut visitor = LogVisitor::new(log_schema);
for action in actions {
action?.0.extract(log_schema.clone(), &mut visitor)?;
}
Expand Down
18 changes: 14 additions & 4 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor};
use self::deletion_vector::DeletionVectorDescriptor;
use crate::actions::schemas::GetStructField;
use crate::features::{ReaderFeatures, WriterFeatures};
use crate::{schema::StructType, DeltaResult, EngineData};
use crate::schema::{SchemaRef, StructType};
use crate::{DeltaResult, EngineData};

pub mod deletion_vector;
pub mod set_transaction;
Expand All @@ -28,7 +29,9 @@ pub(crate) const PROTOCOL_NAME: &str = "protocol";
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";

static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());
static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new([
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Expand All @@ -40,14 +43,21 @@ static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
//Option::<Cdc>::get_struct_field(CDC_NAME),
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
.into()
});

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn get_log_schema() -> &'static StructType {
fn get_log_schema() -> &'static SchemaRef {
&LOG_SCHEMA
}

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct Format {
/// Name of the encoding for files in this table
Expand Down Expand Up @@ -194,7 +204,7 @@ impl Add {
/// Since we always want to parse multiple adds from data, we return a `Vec<Add>`
pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult<Vec<Add>> {
let mut visitor = AddVisitor::default();
data.extract(get_log_schema().project(&[ADD_NAME])?, &mut visitor)?;
data.extract(get_log_add_schema().clone(), &mut visitor)?;
Ok(visitor.adds)
}

Expand Down
12 changes: 5 additions & 7 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ mod tests {

use super::*;
use crate::{
actions::{get_log_schema, ADD_NAME, SET_TRANSACTION_NAME},
actions::{get_log_add_schema, get_log_schema, SET_TRANSACTION_NAME},
engine::arrow_data::ArrowEngineData,
engine::sync::{json::SyncJsonHandler, SyncEngine},
Engine, EngineData, JsonHandler,
Expand All @@ -370,7 +370,7 @@ mod tests {
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand Down Expand Up @@ -433,13 +433,11 @@ mod tests {
r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let add_schema = get_log_schema()
.project(&[ADD_NAME])
.expect("Can't get add schema");
let add_schema = get_log_add_schema().clone();
let mut add_visitor = AddVisitor::default();
batch.extract(add_schema, &mut add_visitor).unwrap();
let add1 = Add {
Expand Down Expand Up @@ -497,7 +495,7 @@ mod tests {
r#"{"txn":{"appId":"myApp2","version": 4, "lastUpdated": 1670892998177}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ mod tests {
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ mod tests {
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]);
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();

let batch = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
Expand All @@ -218,7 +218,7 @@ mod tests {
let json_strings = StringArray::from(vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2, "maxRowId": 3}}}"#,
]);
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();

let batch: RecordBatch = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
Expand Down Expand Up @@ -256,7 +256,7 @@ mod tests {
}];

let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema()).unwrap());
let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema().as_ref()).unwrap());
let data: Vec<RecordBatch> = handler
.read_json_files(files, Arc::new(physical_schema.try_into().unwrap()), None)
.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::sync::{Arc, LazyLock};

use tracing::debug;

use crate::actions::get_log_add_schema;
use crate::actions::visitors::SelectionVectorVisitor;
use crate::actions::{get_log_schema, ADD_NAME};
use crate::error::DeltaResult;
use crate::expressions::{BinaryOperator, Expression as Expr, UnaryOperator, VariadicOperator};
use crate::schema::{DataType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType};
Expand Down Expand Up @@ -243,7 +243,7 @@ impl DataSkippingFilter {
// the predicate is true/null and false (= skip) when the predicate is false.
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_schema().project(&[ADD_NAME]).unwrap(),
get_log_add_schema().clone(),
STATS_EXPR.clone(),
DataType::STRING,
);
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::debug;
use url::Url;

use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor};
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::expressions::{Expression, Scalar};
use crate::features::ColumnMappingMode;
use crate::scan::state::{DvInfo, Stats};
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Scan {
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;
let checkpoint_read_schema = get_log_add_schema().clone();

// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
Expand Down Expand Up @@ -554,7 +554,7 @@ pub(crate) mod test_utils {
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand All @@ -571,7 +571,7 @@ pub(crate) mod test_utils {
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
let output_schema = Arc::new(get_log_schema().clone());
let output_schema = get_log_schema().clone();
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
Expand Down
Loading