From f4eabf078715be1673979779eb02e8809a90a853 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Fri, 11 Oct 2024 15:37:28 -0700 Subject: [PATCH 01/23] wip: rough append data path --- kernel/Cargo.toml | 2 + kernel/src/actions/mod.rs | 31 +++++- kernel/src/engine/default/mod.rs | 2 +- kernel/src/engine/default/parquet.rs | 92 ++++++++++++++- kernel/src/transaction.rs | 129 ++++++++++++++++++++- kernel/tests/write.rs | 160 ++++++++++++++++++++++++--- 6 files changed, 392 insertions(+), 24 deletions(-) diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 8205a2ccf..235df2646 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -90,6 +90,8 @@ default-engine = [ "parquet/object_store", "reqwest", "tokio", + "uuid/v4", + "uuid/fast-rng", ] developer-visibility = [] diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index b23dd6511..49dce5f93 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -8,7 +8,7 @@ use std::sync::LazyLock; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; use self::deletion_vector::DeletionVectorDescriptor; -use crate::actions::schemas::GetStructField; +use crate::actions::schemas::{GetNullableContainerStructField, GetStructField}; use crate::features::{ReaderFeatures, WriterFeatures}; use crate::schema::{SchemaRef, StructType}; use crate::{DeltaResult, EngineData}; @@ -66,6 +66,17 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef { &LOG_COMMIT_INFO_SCHEMA } +// FIXME: should be a projection of LOG_SCHEMA? +pub(crate) static WRITE_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { + StructType::new(vec![ + ::get_struct_field("path"), + >::get_nullable_container_struct_field("partitionValues"), + ::get_struct_field("size"), + ::get_struct_field("modificationTime"), + ::get_struct_field("dataChange"), + ]) +}); + #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { /// Name of the encoding for files in this table @@ -305,6 +316,7 @@ mod tests { use super::*; use crate::schema::{ArrayType, DataType, MapType, StructField}; + use crate::transaction::get_write_metadata_schema; #[test] fn test_metadata_schema() { @@ -481,4 +493,21 @@ mod tests { )])); assert_eq!(schema, expected); } + + #[test] + fn test_write_metadata_schema() { + let schema = get_write_metadata_schema(); + let expected = StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new( + "partitionValues", + MapType::new(DataType::STRING, DataType::STRING, true), + false, + ), + StructField::new("size", DataType::LONG, false), + StructField::new("modificationTime", DataType::LONG, false), + StructField::new("dataChange", DataType::BOOLEAN, false), + ]); + assert_eq!(schema, &expected); + } } diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 9fa1bdb0c..76333c822 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -33,7 +33,7 @@ pub struct DefaultEngine { store: Arc, file_system: Arc>, json: Arc>, - parquet: Arc>, + pub parquet: Arc>, // FIXME expression: Arc, } diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index ffcf4e2e9..162914306 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -3,21 +3,26 @@ use std::ops::Range; use std::sync::Arc; +use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray}; use futures::StreamExt; use object_store::path::Path; use object_store::DynObjectStore; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, }; +use parquet::arrow::arrow_writer::ArrowWriter; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; +use uuid::Uuid; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; +use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; use crate::{ - DeltaResult, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler, + DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, + ParquetHandler, }; #[derive(Debug)] @@ -27,6 +32,19 @@ pub struct DefaultParquetHandler { readahead: usize, } +// TODO: plumb through modification time +#[derive(Debug)] +pub struct ParquetMetadata { + pub path: String, + pub size: i64, +} + +impl ParquetMetadata { + pub fn new(path: String, size: i64) -> Self { + Self { path, size } + } +} + impl DefaultParquetHandler { pub fn new(store: Arc, task_executor: Arc) -> Self { Self { @@ -43,6 +61,78 @@ impl DefaultParquetHandler { self.readahead = readahead; self } + + async fn write_parquet( + &self, + path: &url::Url, + data: Box, + ) -> DeltaResult { + let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?; + let record_batch = batch.record_batch(); + + let mut buffer = vec![]; + let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?; + writer.write(&record_batch)?; + writer.close()?; // writer must be closed to write footer + + let size = buffer.len(); + let name: String = Uuid::new_v4().to_string() + ".parquet"; + // FIXME test with trailing '/' and omitting? + let path = path.join(&name)?; + + self.store + .put(&Path::from(path.path()), buffer.into()) + .await?; + + Ok(ParquetMetadata::new( + path.to_string(), + size.try_into() + .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?, + )) + } + + pub async fn write_parquet_file( + &self, + path: &url::Url, + data: Box, + partition_values: std::collections::HashMap, + data_change: bool, + ) -> DeltaResult> { + let ParquetMetadata { path, size } = self.write_parquet(path, data).await?; + let modification_time = chrono::Utc::now().timestamp_millis(); // FIXME + let write_metadata_schema = crate::transaction::get_write_metadata_schema(); + + // create the record batch of the write metadata + let path = Arc::new(StringArray::from(vec![path.to_string()])); + use arrow_array::builder::StringBuilder; + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let names = arrow_array::builder::MapFieldNames { + entry: "key_value".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut builder = + arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder); + if partition_values.is_empty() { + builder.append(true).unwrap(); + } else { + for (k, v) in partition_values { + builder.keys().append_value(&k); + builder.values().append_value(&v); + builder.append(true).unwrap(); + } + } + let partitions = Arc::new(builder.finish()); + let size = Arc::new(Int64Array::from(vec![size])); + let data_change = Arc::new(BooleanArray::from(vec![data_change])); + let modification_time = Arc::new(Int64Array::from(vec![modification_time])); + + Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( + Arc::new(write_metadata_schema.try_into()?), + vec![path, partitions, size, modification_time, data_change], + )?))) + } } impl ParquetHandler for DefaultParquetHandler { diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 81b0f31f8..5f8b4b235 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -1,16 +1,20 @@ use std::iter; +use std::mem; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use crate::actions::get_log_commit_info_schema; -use crate::actions::COMMIT_INFO_NAME; +use crate::actions::{ADD_NAME, COMMIT_INFO_NAME}; use crate::error::Error; -use crate::expressions::{column_expr, Scalar, StructData}; +use crate::expressions::{column_expr, ColumnName, Scalar, StructData}; use crate::path::ParsedLogPath; -use crate::schema::{StructField, StructType}; +use crate::schema::{SchemaRef, StructField, StructType}; use crate::snapshot::Snapshot; use crate::{DataType, DeltaResult, Engine, EngineData, Expression, Version}; +use itertools::chain; +use url::Url; + const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); const UNKNOWN_OPERATION: &str = "UNKNOWN"; @@ -32,6 +36,14 @@ pub struct Transaction { read_snapshot: Arc, operation: Option, commit_info: Option>, + write_metadata: Box> + Send>, +} + +/// Get the expected schema for [`write_metadata`]. +/// +/// [`write_metadata`]: crate::transaction::Transaction::write_metadata +pub fn get_write_metadata_schema() -> &'static StructType { + &crate::actions::WRITE_METADATA_SCHEMA } impl std::fmt::Debug for Transaction { @@ -56,6 +68,7 @@ impl Transaction { read_snapshot: snapshot.into(), operation: None, commit_info: None, + write_metadata: Box::new(std::iter::empty()), } } @@ -74,6 +87,11 @@ impl Transaction { engine_commit_info.as_ref(), )?)); + // TODO consider IntoIterator so we can have multiple write_metadata iterators (and return + // self in the conflict case for retries) + let adds = generate_adds(engine, self.write_metadata); + let actions = chain(actions, adds); + // step two: set new commit version (current_version + 1) and path to write let commit_version = self.read_snapshot.version() + 1; let commit_path = @@ -83,7 +101,8 @@ impl Transaction { let json_handler = engine.get_json_handler(); match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) { Ok(()) => Ok(CommitResult::Committed(commit_version)), - Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), + // FIXME + // Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), Err(e) => Err(e), } } @@ -112,6 +131,108 @@ impl Transaction { self.commit_info = Some(commit_info.into()); self } + + // Generate the logical-to-physical transform expression for this transaction. At the moment, + // this is a transaction-wide expression. + fn generate_logical_to_physical(&self) -> Expression { + // for now, we just pass through all the columns except partition columns. + // note this is _incorrect_ if table config deems we need partition columns. + Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| { + if self + .read_snapshot + .metadata() + .partition_columns + .contains(&f.name()) + { + None + } else { + Some(Expression::Column(ColumnName::new(iter::once(f.name())))) + } + })) + } + + /// Get the write context for this transaction. At the moment, this is constant for the whole + /// transaction. + pub fn write_context(&self) -> WriteContext { + let target_dir = self.read_snapshot.table_root(); + let snapshot_schema = self.read_snapshot.schema(); + let partition_cols = self.read_snapshot.metadata().partition_columns.clone(); + let logical_to_physical = self.generate_logical_to_physical(); + WriteContext::new( + target_dir.clone(), + Arc::new(snapshot_schema.clone()), + partition_cols, + logical_to_physical, + ) + } + + /// Add write metadata about files to include in the transaction. This API can be called + /// multiple times to add multiple iterators. + /// + /// TODO what is expected schema for the batches? + pub fn add_write_metadata( + &mut self, + data: Box> + Send>, + ) { + let write_metadata = mem::replace(&mut self.write_metadata, Box::new(std::iter::empty())); + self.write_metadata = Box::new(chain(write_metadata, data)); + } +} + +// this does something similar to adding top-level 'commitInfo' named struct. we should unify. +fn generate_adds( + engine: &dyn Engine, + write_metadata: Box> + Send>, +) -> Box> + Send> { + let expression_handler = engine.get_expression_handler(); + let write_metadata_schema = get_write_metadata_schema(); + let log_schema: DataType = DataType::struct_type(vec![StructField::new( + ADD_NAME, + write_metadata_schema.clone(), + true, + )]); + + Box::new(write_metadata.map(move |write_metadata_batch| { + let adds_expr = Expression::struct_from([Expression::struct_from( + write_metadata_schema + .fields() + .map(|f| Expression::Column(ColumnName::new(iter::once(f.name())))), + )]); + let adds_evaluator = expression_handler.get_evaluator( + write_metadata_schema.clone().into(), + adds_expr, + log_schema.clone(), + ); + adds_evaluator + .evaluate(write_metadata_batch.as_ref()) + .expect("fixme") + })) +} +/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to +/// write table data. +/// +/// [`Transaction`]: struct.Transaction.html +pub struct WriteContext { + pub target_dir: Url, + pub schema: SchemaRef, + pub partition_cols: Vec, + pub logical_to_physical: Expression, +} + +impl WriteContext { + fn new( + target_dir: Url, + schema: SchemaRef, + partition_cols: Vec, + logical_to_physical: Expression, + ) -> Self { + WriteContext { + target_dir, + schema, + partition_cols, + logical_to_physical, + } + } } /// Result after committing a transaction. If 'committed', the version is the new version written diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 212b06cae..b0e79d2c8 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -14,7 +14,8 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; -use delta_kernel::{Error as KernelError, Table}; +use delta_kernel::Error as KernelError; +use delta_kernel::{DeltaResult, Engine, Table}; // setup default engine with in-memory object store. fn setup( @@ -85,21 +86,8 @@ async fn create_table( Ok(Table::new(table_path)) } -#[tokio::test] -async fn test_commit_info() -> Result<(), Box> { - // setup tracing - let _ = tracing_subscriber::fmt::try_init(); - // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); - - // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( - "number", - DataType::INTEGER, - true, - )])); - let table = create_table(store.clone(), table_location, schema, &[]).await?; - +// create commit info in arrow of the form {engineInfo: "default engine"} +fn new_commit_info() -> DeltaResult> { // create commit info of the form {engineCommitInfo: Map { "engineInfo": "default engine" } } let commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( "engineCommitInfo", @@ -136,11 +124,30 @@ async fn test_commit_info() -> Result<(), Box> { let commit_info_batch = RecordBatch::try_new(commit_info_schema.clone(), vec![Arc::new(array)])?; + Ok(Box::new(ArrowEngineData::new(commit_info_batch))) +} + +#[tokio::test] +async fn test_commit_info() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table"); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema, &[]).await?; + + let commit_info = new_commit_info()?; // create a transaction let txn = table .new_transaction(&engine)? - .with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch))); + .with_commit_info(commit_info); // commit! txn.commit(&engine)?; @@ -251,3 +258,122 @@ async fn test_invalid_commit_info() -> Result<(), Box> { )); Ok(()) } + +#[tokio::test] +async fn test_append() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table"); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema.clone(), &[]).await?; + + let commit_info = new_commit_info()?; + + let mut txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.write_context()); + let tasks = append_data.into_iter().map(|data| { + tokio::task::spawn({ + let engine = Arc::clone(&engine); + let write_context = Arc::clone(&write_context); + async move { + let parquet_handler = &engine.parquet; + parquet_handler + .write_parquet_file( + &write_context.target_dir, + data.expect("FIXME"), + std::collections::HashMap::new(), + true, + ) + .await + .expect("FIXME") + } + }) + }); + + // FIXME this is collecting to vec + let write_metadata = futures::future::join_all(tasks) + .await + .into_iter() + .map(|r| r.unwrap()); + txn.add_write_metadata(Box::new(write_metadata)); + + // commit! + txn.commit(engine.as_ref())?; + + let commit1 = store + .get(&Path::from( + "/test_table/_delta_log/00000000000000000001.json", + )) + .await?; + let commit1 = String::from_utf8(commit1.bytes().await?.to_vec())?; + println!("{}", commit1); + + test_read( + Box::new(ArrowEngineData::new(RecordBatch::try_new( + Arc::new(schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(vec![ + 1, 2, 3, 4, 5, 6, + ]))], + )?)), + table, + engine.as_ref(), + )?; + Ok(()) +} + +fn test_read( + expected: Box, + table: Table, + engine: &impl Engine, +) -> Result<(), Box> { + let snapshot = table.snapshot(engine, None)?; + let scan = snapshot.into_scan_builder().build()?; + let actual = scan.execute(engine)?; + let batches: Vec = actual + .into_iter() + .map(|res| { + let data = res.unwrap().raw_data.unwrap(); + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .unwrap() + .into(); + record_batch + }) + .collect(); + + let formatted = arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string(); + + let expected = arrow::util::pretty::pretty_format_batches(&[expected.record_batch().clone()]) + .unwrap() + .to_string(); + + println!("expected:\n{expected}"); + println!("got:\n{formatted}"); + assert_eq!(formatted, expected); + + Ok(()) +} From 59fd6797394db7b42112b4a92d0ad9886f47e666 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 28 Oct 2024 21:13:47 -0700 Subject: [PATCH 02/23] fix cherry pick --- kernel/src/engine/default/parquet.rs | 22 +++++-- kernel/src/transaction.rs | 2 +- kernel/tests/write.rs | 96 ++++++++++++++++++++++++++-- 3 files changed, 109 insertions(+), 11 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 162914306..114a456b7 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -32,16 +32,16 @@ pub struct DefaultParquetHandler { readahead: usize, } -// TODO: plumb through modification time #[derive(Debug)] pub struct ParquetMetadata { pub path: String, pub size: i64, + pub modification_time: i64, } impl ParquetMetadata { - pub fn new(path: String, size: i64) -> Self { - Self { path, size } + pub fn new(path: String, size: i64, modification_time: i64) -> Self { + Self { path, size, modification_time } } } @@ -72,7 +72,7 @@ impl DefaultParquetHandler { let mut buffer = vec![]; let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?; - writer.write(&record_batch)?; + writer.write(record_batch)?; writer.close()?; // writer must be closed to write footer let size = buffer.len(); @@ -84,10 +84,21 @@ impl DefaultParquetHandler { .put(&Path::from(path.path()), buffer.into()) .await?; + let metadata = self.store.head(&Path::from(path.path())).await?; + let modification_time = metadata.last_modified.timestamp(); + if size != metadata.size { + return Err(Error::generic(format!( + "Size mismatch after writing parquet file: expected {}, got {}", + size, metadata.size + ))); + } + Ok(ParquetMetadata::new( path.to_string(), + // this means max size we can write is i64::MAX (~8EB) size.try_into() .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?, + modification_time, )) } @@ -98,8 +109,7 @@ impl DefaultParquetHandler { partition_values: std::collections::HashMap, data_change: bool, ) -> DeltaResult> { - let ParquetMetadata { path, size } = self.write_parquet(path, data).await?; - let modification_time = chrono::Utc::now().timestamp_millis(); // FIXME + let ParquetMetadata { path, size, modification_time } = self.write_parquet(path, data).await?; let write_metadata_schema = crate::transaction::get_write_metadata_schema(); // create the record batch of the write metadata diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 5f8b4b235..d47e956b8 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -142,7 +142,7 @@ impl Transaction { .read_snapshot .metadata() .partition_columns - .contains(&f.name()) + .contains(f.name()) { None } else { diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index b0e79d2c8..8b2685510 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -4,9 +4,11 @@ use arrow::array::StringArray; use arrow::record_batch::RecordBatch; use arrow_schema::Schema as ArrowSchema; use arrow_schema::{DataType as ArrowDataType, Field}; +use itertools::Itertools; use object_store::memory::InMemory; use object_store::path::Path; use object_store::ObjectStore; +use serde_json::Deserializer; use serde_json::{json, to_vec}; use url::Url; @@ -312,10 +314,30 @@ async fn test_append() -> Result<(), Box> { }); // FIXME this is collecting to vec + //let write_metadata: Vec = futures::future::join_all(tasks) + // .await + // .into_iter() + // .map(|data| { + // let data = data.unwrap(); + // data.into_any() + // .downcast::() + // .unwrap() + // .into() + // }) + // .collect(); + + //txn.add_write_metadata(Box::new( + // write_metadata + // .into_iter() + // .map(|data| Box::new(ArrowEngineData::new(data))), + //)); + let write_metadata = futures::future::join_all(tasks) .await .into_iter() - .map(|r| r.unwrap()); + .map(|data| { + data.unwrap() + }); txn.add_write_metadata(Box::new(write_metadata)); // commit! @@ -326,8 +348,74 @@ async fn test_append() -> Result<(), Box> { "/test_table/_delta_log/00000000000000000001.json", )) .await?; - let commit1 = String::from_utf8(commit1.bytes().await?.to_vec())?; - println!("{}", commit1); + + let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?) + .into_iter::() + .try_collect()?; + + // FIXME + *parsed_commits[0] + .get_mut("commitInfo") + .unwrap() + .get_mut("timestamp") + .unwrap() = serde_json::Value::Number(0.into()); + *parsed_commits[1] + .get_mut("add") + .unwrap() + .get_mut("modificationTime") + .unwrap() = serde_json::Value::Number(0.into()); + *parsed_commits[1] + .get_mut("add") + .unwrap() + .get_mut("path") + .unwrap() = serde_json::Value::String("first.parquet".to_string()); + *parsed_commits[2] + .get_mut("add") + .unwrap() + .get_mut("modificationTime") + .unwrap() = serde_json::Value::Number(0.into()); + *parsed_commits[2] + .get_mut("add") + .unwrap() + .get_mut("path") + .unwrap() = serde_json::Value::String("second.parquet".to_string()); + + let expected_commit = vec![ + json!({ + "commitInfo": { + "timestamp": 0, + "operation": "UNKNOWN", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }), + json!({ + "add": { + "path": "first.parquet", + "partitionValues": {}, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + json!({ + "add": { + "path": "second.parquet", + "partitionValues": {}, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + ]; + + println!("actual:\n{parsed_commits:#?}"); + println!("expected:\n{expected_commit:#?}"); + + assert_eq!(parsed_commits, expected_commit); test_read( Box::new(ArrowEngineData::new(RecordBatch::try_new( @@ -371,8 +459,8 @@ fn test_read( .unwrap() .to_string(); + println!("actual:\n{formatted}"); println!("expected:\n{expected}"); - println!("got:\n{formatted}"); assert_eq!(formatted, expected); Ok(()) From 5926d96e0c22f94df364b301dcf73fb508aaccb9 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 28 Oct 2024 21:18:52 -0700 Subject: [PATCH 03/23] clean --- kernel/src/engine/default/parquet.rs | 12 ++++++++++-- kernel/tests/write.rs | 14 ++++++-------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 114a456b7..ad32e910c 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -41,7 +41,11 @@ pub struct ParquetMetadata { impl ParquetMetadata { pub fn new(path: String, size: i64, modification_time: i64) -> Self { - Self { path, size, modification_time } + Self { + path, + size, + modification_time, + } } } @@ -109,7 +113,11 @@ impl DefaultParquetHandler { partition_values: std::collections::HashMap, data_change: bool, ) -> DeltaResult> { - let ParquetMetadata { path, size, modification_time } = self.write_parquet(path, data).await?; + let ParquetMetadata { + path, + size, + modification_time, + } = self.write_parquet(path, data).await?; let write_metadata_schema = crate::transaction::get_write_metadata_schema(); // create the record batch of the write metadata diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 8b2685510..1bc2d9759 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -335,9 +335,7 @@ async fn test_append() -> Result<(), Box> { let write_metadata = futures::future::join_all(tasks) .await .into_iter() - .map(|data| { - data.unwrap() - }); + .map(|data| data.unwrap()); txn.add_write_metadata(Box::new(write_metadata)); // commit! @@ -418,21 +416,21 @@ async fn test_append() -> Result<(), Box> { assert_eq!(parsed_commits, expected_commit); test_read( - Box::new(ArrowEngineData::new(RecordBatch::try_new( + &ArrowEngineData::new(RecordBatch::try_new( Arc::new(schema.as_ref().try_into()?), vec![Arc::new(arrow::array::Int32Array::from(vec![ 1, 2, 3, 4, 5, 6, ]))], - )?)), - table, + )?), + &table, engine.as_ref(), )?; Ok(()) } fn test_read( - expected: Box, - table: Table, + expected: &ArrowEngineData, + table: &Table, engine: &impl Engine, ) -> Result<(), Box> { let snapshot = table.snapshot(engine, None)?; From 3c7dd37abf9599bd779de62c2e1c826fbe79c888 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 28 Oct 2024 21:48:45 -0700 Subject: [PATCH 04/23] add partition test --- kernel/tests/write.rs | 247 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 219 insertions(+), 28 deletions(-) diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 1bc2d9759..1f1da07dc 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use arrow::array::StringArray; +use arrow::array::{Int32Array, StringArray}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema as ArrowSchema; use arrow_schema::{DataType as ArrowDataType, Field}; @@ -261,6 +261,42 @@ async fn test_invalid_commit_info() -> Result<(), Box> { Ok(()) } +fn test_read( + expected: &ArrowEngineData, + table: &Table, + engine: &impl Engine, +) -> Result<(), Box> { + let snapshot = table.snapshot(engine, None)?; + let scan = snapshot.into_scan_builder().build()?; + let actual = scan.execute(engine)?; + let batches: Vec = actual + .into_iter() + .map(|res| { + let data = res.unwrap().raw_data.unwrap(); + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .unwrap() + .into(); + record_batch + }) + .collect(); + + let formatted = arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string(); + + let expected = arrow::util::pretty::pretty_format_batches(&[expected.record_batch().clone()]) + .unwrap() + .to_string(); + + println!("actual:\n{formatted}"); + println!("expected:\n{expected}"); + assert_eq!(formatted, expected); + + Ok(()) +} + #[tokio::test] async fn test_append() -> Result<(), Box> { // setup tracing @@ -428,38 +464,193 @@ async fn test_append() -> Result<(), Box> { Ok(()) } -fn test_read( - expected: &ArrowEngineData, - table: &Table, - engine: &impl Engine, -) -> Result<(), Box> { - let snapshot = table.snapshot(engine, None)?; - let scan = snapshot.into_scan_builder().build()?; - let actual = scan.execute(engine)?; - let batches: Vec = actual +#[tokio::test] +async fn test_append_partitioned() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table"); + let partition_col = "partition"; + + // create a simple partitioned table: one int column named 'number', partitioned by string + // column named 'partition' + let table_schema = Arc::new(StructType::new(vec![ + StructField::new("number", DataType::INTEGER, true), + StructField::new("partition", DataType::STRING, true), + ])); + let data_schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table( + store.clone(), + table_location, + table_schema.clone(), + &[partition_col], + ) + .await?; + + let commit_info = new_commit_info()?; + + let mut txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(data_schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + let partition_vals = vec!["a", "b"]; + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.write_context()); + let tasks = append_data .into_iter() - .map(|res| { - let data = res.unwrap().raw_data.unwrap(); - let record_batch: RecordBatch = data - .into_any() - .downcast::() - .unwrap() - .into(); - record_batch - }) - .collect(); + .zip(partition_vals) + .map(|(data, partition_val)| { + tokio::task::spawn({ + let engine = Arc::clone(&engine); + let write_context = Arc::clone(&write_context); + async move { + let parquet_handler = &engine.parquet; + parquet_handler + .write_parquet_file( + &write_context.target_dir, + data.expect("FIXME"), + std::collections::HashMap::from([( + partition_col.to_string(), + partition_val.to_string(), + )]), + true, + ) + .await + .expect("FIXME") + } + }) + }); - let formatted = arrow::util::pretty::pretty_format_batches(&batches) - .unwrap() - .to_string(); + // FIXME this is collecting to vec + //let write_metadata: Vec = futures::future::join_all(tasks) + // .await + // .into_iter() + // .map(|data| { + // let data = data.unwrap(); + // data.into_any() + // .downcast::() + // .unwrap() + // .into() + // }) + // .collect(); - let expected = arrow::util::pretty::pretty_format_batches(&[expected.record_batch().clone()]) + //txn.add_write_metadata(Box::new( + // write_metadata + // .into_iter() + // .map(|data| Box::new(ArrowEngineData::new(data))), + //)); + + let write_metadata = futures::future::join_all(tasks) + .await + .into_iter() + .map(|data| data.unwrap()); + txn.add_write_metadata(Box::new(write_metadata)); + + // commit! + txn.commit(engine.as_ref())?; + + let commit1 = store + .get(&Path::from( + "/test_table/_delta_log/00000000000000000001.json", + )) + .await?; + + let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?) + .into_iter::() + .try_collect()?; + + // FIXME + *parsed_commits[0] + .get_mut("commitInfo") .unwrap() - .to_string(); + .get_mut("timestamp") + .unwrap() = serde_json::Value::Number(0.into()); + *parsed_commits[1] + .get_mut("add") + .unwrap() + .get_mut("modificationTime") + .unwrap() = serde_json::Value::Number(0.into()); + *parsed_commits[1] + .get_mut("add") + .unwrap() + .get_mut("path") + .unwrap() = serde_json::Value::String("first.parquet".to_string()); + *parsed_commits[2] + .get_mut("add") + .unwrap() + .get_mut("modificationTime") + .unwrap() = serde_json::Value::Number(0.into()); + *parsed_commits[2] + .get_mut("add") + .unwrap() + .get_mut("path") + .unwrap() = serde_json::Value::String("second.parquet".to_string()); - println!("actual:\n{formatted}"); - println!("expected:\n{expected}"); - assert_eq!(formatted, expected); + let expected_commit = vec![ + json!({ + "commitInfo": { + "timestamp": 0, + "operation": "UNKNOWN", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }), + json!({ + "add": { + "path": "first.parquet", + "partitionValues": { + "partition": "a" + }, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + json!({ + "add": { + "path": "second.parquet", + "partitionValues": { + "partition": "b" + }, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + ]; + + println!("actual:\n{parsed_commits:#?}"); + println!("expected:\n{expected_commit:#?}"); + + assert_eq!(parsed_commits, expected_commit); + test_read( + &ArrowEngineData::new(RecordBatch::try_new( + Arc::new(table_schema.as_ref().try_into()?), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])), + Arc::new(StringArray::from(vec!["a", "a", "a", "b", "b", "b"])), + ], + )?), + &table, + engine.as_ref(), + )?; Ok(()) } From cc01511f702008c7dbb9d412760555402e33413f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 29 Oct 2024 13:01:16 -0700 Subject: [PATCH 05/23] more cleanup --- kernel/src/actions/mod.rs | 31 +---------------- kernel/src/engine/default/parquet.rs | 2 +- kernel/src/transaction.rs | 50 +++++++++++++++++++++++----- kernel/tests/write.rs | 24 +++---------- 4 files changed, 48 insertions(+), 59 deletions(-) diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 49dce5f93..b23dd6511 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -8,7 +8,7 @@ use std::sync::LazyLock; use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor}; use self::deletion_vector::DeletionVectorDescriptor; -use crate::actions::schemas::{GetNullableContainerStructField, GetStructField}; +use crate::actions::schemas::GetStructField; use crate::features::{ReaderFeatures, WriterFeatures}; use crate::schema::{SchemaRef, StructType}; use crate::{DeltaResult, EngineData}; @@ -66,17 +66,6 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef { &LOG_COMMIT_INFO_SCHEMA } -// FIXME: should be a projection of LOG_SCHEMA? -pub(crate) static WRITE_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { - StructType::new(vec![ - ::get_struct_field("path"), - >::get_nullable_container_struct_field("partitionValues"), - ::get_struct_field("size"), - ::get_struct_field("modificationTime"), - ::get_struct_field("dataChange"), - ]) -}); - #[derive(Debug, Clone, PartialEq, Eq, Schema)] pub struct Format { /// Name of the encoding for files in this table @@ -316,7 +305,6 @@ mod tests { use super::*; use crate::schema::{ArrayType, DataType, MapType, StructField}; - use crate::transaction::get_write_metadata_schema; #[test] fn test_metadata_schema() { @@ -493,21 +481,4 @@ mod tests { )])); assert_eq!(schema, expected); } - - #[test] - fn test_write_metadata_schema() { - let schema = get_write_metadata_schema(); - let expected = StructType::new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new( - "partitionValues", - MapType::new(DataType::STRING, DataType::STRING, true), - false, - ), - StructField::new("size", DataType::LONG, false), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("dataChange", DataType::BOOLEAN, false), - ]); - assert_eq!(schema, &expected); - } } diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index ad32e910c..dea50558b 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -147,7 +147,7 @@ impl DefaultParquetHandler { let modification_time = Arc::new(Int64Array::from(vec![modification_time])); Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( - Arc::new(write_metadata_schema.try_into()?), + Arc::new(write_metadata_schema.as_ref().try_into()?), vec![path, partitions, size, modification_time, data_change], )?))) } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index d47e956b8..1b359f59d 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -1,9 +1,11 @@ +use std::collections::HashMap; use std::iter; use std::mem; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use std::time::{SystemTime, UNIX_EPOCH}; use crate::actions::get_log_commit_info_schema; +use crate::actions::schemas::{GetNullableContainerStructField, GetStructField}; use crate::actions::{ADD_NAME, COMMIT_INFO_NAME}; use crate::error::Error; use crate::expressions::{column_expr, ColumnName, Scalar, StructData}; @@ -18,6 +20,25 @@ use url::Url; const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); const UNKNOWN_OPERATION: &str = "UNKNOWN"; +// FIXME: should be a projection of LOG_SCHEMA? +pub(crate) static WRITE_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { + StructType::new(vec![ + ::get_struct_field("path"), + >::get_nullable_container_struct_field("partitionValues"), + ::get_struct_field("size"), + ::get_struct_field("modificationTime"), + ::get_struct_field("dataChange"), + ]) + .into() +}); + +/// Get the expected schema for [`write_metadata`]. +/// +/// [`write_metadata`]: crate::transaction::Transaction::write_metadata +pub fn get_write_metadata_schema() -> &'static SchemaRef { + &WRITE_METADATA_SCHEMA +} + /// A transaction represents an in-progress write to a table. After creating a transaction, changes /// to the table may be staged via the transaction methods before calling `commit` to commit the /// changes to the table. @@ -39,13 +60,6 @@ pub struct Transaction { write_metadata: Box> + Send>, } -/// Get the expected schema for [`write_metadata`]. -/// -/// [`write_metadata`]: crate::transaction::Transaction::write_metadata -pub fn get_write_metadata_schema() -> &'static StructType { - &crate::actions::WRITE_METADATA_SCHEMA -} - impl std::fmt::Debug for Transaction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(&format!( @@ -199,7 +213,7 @@ fn generate_adds( .map(|f| Expression::Column(ColumnName::new(iter::once(f.name())))), )]); let adds_evaluator = expression_handler.get_evaluator( - write_metadata_schema.clone().into(), + write_metadata_schema.clone(), adds_expr, log_schema.clone(), ); @@ -329,6 +343,7 @@ mod tests { use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_expression::ArrowExpressionHandler; + use crate::schema::MapType; use crate::{ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler}; use arrow::json::writer::LineDelimitedWriter; @@ -662,4 +677,21 @@ mod tests { } Ok(()) } + + #[test] + fn test_write_metadata_schema() { + let schema = get_write_metadata_schema(); + let expected = StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new( + "partitionValues", + MapType::new(DataType::STRING, DataType::STRING, true), + false, + ), + StructField::new("size", DataType::LONG, false), + StructField::new("modificationTime", DataType::LONG, false), + StructField::new("dataChange", DataType::BOOLEAN, false), + ]); + assert_eq!(*schema, expected.into()); + } } diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 1f1da07dc..e66a5d416 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -30,6 +30,11 @@ fn setup( let table_root_path = Path::from(format!("/{table_name}")); let url = Url::parse(&format!("memory:///{}/", table_root_path)).unwrap(); let storage = Arc::new(InMemory::new()); + // uncomment for persisting to local filesystem + // use object_store::local::LocalFileSystem; + // let table_root_path = Path::from(format!("users/zach.schuermann/Desktop/kernel_tests/{table_name}")); + // let url = Url::parse(&format!("file:///users/zach.schuermann/Desktop/kernel_tests/{}/", table_root_path)).unwrap(); + // let storage = Arc::new(LocalFileSystem::new()); ( storage.clone(), DefaultEngine::new( @@ -535,25 +540,6 @@ async fn test_append_partitioned() -> Result<(), Box> { }) }); - // FIXME this is collecting to vec - //let write_metadata: Vec = futures::future::join_all(tasks) - // .await - // .into_iter() - // .map(|data| { - // let data = data.unwrap(); - // data.into_any() - // .downcast::() - // .unwrap() - // .into() - // }) - // .collect(); - - //txn.add_write_metadata(Box::new( - // write_metadata - // .into_iter() - // .map(|data| Box::new(ArrowEngineData::new(data))), - //)); - let write_metadata = futures::future::join_all(tasks) .await .into_iter() From a6290f26ffce0d56b8764784cc301cbf7261417f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 29 Oct 2024 13:25:26 -0700 Subject: [PATCH 06/23] use filemeta in parquet metadata --- kernel/src/engine/default/parquet.rs | 40 +++++++++++++--------------- kernel/src/lib.rs | 11 ++++++++ 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index dea50558b..565a08f20 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -32,20 +32,16 @@ pub struct DefaultParquetHandler { readahead: usize, } +/// Metadata of a parquet file, currently just includes the file metadata but will expand to +/// include file statistics and other metadata in the future. #[derive(Debug)] pub struct ParquetMetadata { - pub path: String, - pub size: i64, - pub modification_time: i64, + file_meta: FileMeta, } impl ParquetMetadata { - pub fn new(path: String, size: i64, modification_time: i64) -> Self { - Self { - path, - size, - modification_time, - } + pub fn new(file_meta: FileMeta) -> Self { + Self { file_meta } } } @@ -97,13 +93,8 @@ impl DefaultParquetHandler { ))); } - Ok(ParquetMetadata::new( - path.to_string(), - // this means max size we can write is i64::MAX (~8EB) - size.try_into() - .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?, - modification_time, - )) + let file_meta = FileMeta::new(path, modification_time, size); + Ok(ParquetMetadata::new(file_meta)) } pub async fn write_parquet_file( @@ -113,15 +104,16 @@ impl DefaultParquetHandler { partition_values: std::collections::HashMap, data_change: bool, ) -> DeltaResult> { - let ParquetMetadata { - path, + let ParquetMetadata { file_meta } = self.write_parquet(path, data).await?; + let FileMeta { + location, + last_modified, size, - modification_time, - } = self.write_parquet(path, data).await?; + } = file_meta; let write_metadata_schema = crate::transaction::get_write_metadata_schema(); // create the record batch of the write metadata - let path = Arc::new(StringArray::from(vec![path.to_string()])); + let path = Arc::new(StringArray::from(vec![location.to_string()])); use arrow_array::builder::StringBuilder; let key_builder = StringBuilder::new(); let val_builder = StringBuilder::new(); @@ -142,9 +134,13 @@ impl DefaultParquetHandler { } } let partitions = Arc::new(builder.finish()); + // this means max size we can write is i64::MAX (~8EB) + let size: i64 = size + .try_into() + .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; let size = Arc::new(Int64Array::from(vec![size])); let data_change = Arc::new(BooleanArray::from(vec![data_change])); - let modification_time = Arc::new(Int64Array::from(vec![modification_time])); + let modification_time = Arc::new(Int64Array::from(vec![last_modified])); Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( Arc::new(write_metadata_schema.as_ref().try_into()?), diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 6d47d9ae2..087c28a1b 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -124,6 +124,17 @@ impl PartialOrd for FileMeta { } } +impl FileMeta { + /// Create a new instance of `FileMeta` + pub fn new(location: Url, last_modified: i64, size: usize) -> Self { + Self { + location, + last_modified, + size, + } + } +} + /// Trait for implementing an Expression evaluator. /// /// It contains one Expression which can be evaluated on multiple ColumnarBatches. From c7dac51b7d39731d934728adcc0b45f8ecede311 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 29 Oct 2024 15:29:18 -0700 Subject: [PATCH 07/23] materialize write_metadata as vec --- kernel/src/engine/default/json.rs | 2 +- kernel/src/engine/default/parquet.rs | 112 +++++++++++++++++---------- kernel/src/engine/sync/json.rs | 2 +- kernel/src/lib.rs | 2 +- kernel/src/transaction.rs | 73 +++++++++-------- kernel/tests/write.rs | 14 +++- 6 files changed, 124 insertions(+), 81 deletions(-) diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index b03b26bc6..ab108b5b3 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -96,7 +96,7 @@ impl JsonHandler for DefaultJsonHandler { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box> + Send + '_>, _overwrite: bool, ) -> DeltaResult<()> { let buffer = to_json_bytes(data)?; diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 565a08f20..edf7e90db 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -1,8 +1,10 @@ //! Default Parquet handler implementation +use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; +use arrow_array::builder::{MapBuilder, MapFieldNames, StringBuilder}; use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray}; use futures::StreamExt; use object_store::path::Path; @@ -43,6 +45,53 @@ impl ParquetMetadata { pub fn new(file_meta: FileMeta) -> Self { Self { file_meta } } + + // convert ParquetMetadata into a record batch which matches the 'write_metadata' schema + fn create_write_metadata( + &self, + partition_values: HashMap, + data_change: bool, + ) -> DeltaResult> { + let ParquetMetadata { file_meta } = self; + let FileMeta { + location, + last_modified, + size, + } = file_meta; + let write_metadata_schema = crate::transaction::get_write_metadata_schema(); + + // create the record batch of the write metadata + let path = Arc::new(StringArray::from(vec![location.to_string()])); + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let names = MapFieldNames { + entry: "key_value".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut builder = MapBuilder::new(Some(names), key_builder, val_builder); + if partition_values.is_empty() { + builder.append(true).unwrap(); + } else { + for (k, v) in partition_values { + builder.keys().append_value(&k); + builder.values().append_value(&v); + builder.append(true).unwrap(); + } + } + let partitions = Arc::new(builder.finish()); + // this means max size we can write is i64::MAX (~8EB) + let size: i64 = (*size) + .try_into() + .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; + let size = Arc::new(Int64Array::from(vec![size])); + let data_change = Arc::new(BooleanArray::from(vec![data_change])); + let modification_time = Arc::new(Int64Array::from(vec![*last_modified])); + Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( + Arc::new(write_metadata_schema.as_ref().try_into()?), + vec![path, partitions, size, modification_time, data_change], + )?))) + } } impl DefaultParquetHandler { @@ -62,6 +111,11 @@ impl DefaultParquetHandler { self } + // Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet + // metadata. + // + // Note: after encoding the data as parquet, this issues a PUT followed by a HEAD to storage in + // order to obtain metadata about the object just written. async fn write_parquet( &self, path: &url::Url, @@ -97,55 +151,21 @@ impl DefaultParquetHandler { Ok(ParquetMetadata::new(file_meta)) } + /// Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet + /// metadata as an EngineData batch which matches the [write metadata] schema + /// + /// [write metadata]: crate::transaction::get_write_metadata_schema pub async fn write_parquet_file( &self, path: &url::Url, data: Box, - partition_values: std::collections::HashMap, + partition_values: HashMap, data_change: bool, ) -> DeltaResult> { - let ParquetMetadata { file_meta } = self.write_parquet(path, data).await?; - let FileMeta { - location, - last_modified, - size, - } = file_meta; - let write_metadata_schema = crate::transaction::get_write_metadata_schema(); - - // create the record batch of the write metadata - let path = Arc::new(StringArray::from(vec![location.to_string()])); - use arrow_array::builder::StringBuilder; - let key_builder = StringBuilder::new(); - let val_builder = StringBuilder::new(); - let names = arrow_array::builder::MapFieldNames { - entry: "key_value".to_string(), - key: "key".to_string(), - value: "value".to_string(), - }; - let mut builder = - arrow_array::builder::MapBuilder::new(Some(names), key_builder, val_builder); - if partition_values.is_empty() { - builder.append(true).unwrap(); - } else { - for (k, v) in partition_values { - builder.keys().append_value(&k); - builder.values().append_value(&v); - builder.append(true).unwrap(); - } - } - let partitions = Arc::new(builder.finish()); - // this means max size we can write is i64::MAX (~8EB) - let size: i64 = size - .try_into() - .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; - let size = Arc::new(Int64Array::from(vec![size])); - let data_change = Arc::new(BooleanArray::from(vec![data_change])); - let modification_time = Arc::new(Int64Array::from(vec![last_modified])); - - Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( - Arc::new(write_metadata_schema.as_ref().try_into()?), - vec![path, partitions, size, modification_time, data_change], - )?))) + let parquet_metadata = self.write_parquet(path, data).await?; + let write_metadata = + parquet_metadata.create_write_metadata(partition_values, data_change)?; + Ok(write_metadata) } } @@ -401,4 +421,10 @@ mod tests { assert_eq!(data.len(), 1); assert_eq!(data[0].num_rows(), 10); } + + #[test] + fn test_into_write_metadata() {} + + #[tokio::test] + async fn test_write_parquet() {} } diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index 016fb2658..1354fcf6c 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box> + Send + '_>, _overwrite: bool, ) -> DeltaResult<()> { let path = path diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 087c28a1b..84889a122 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -244,7 +244,7 @@ pub trait JsonHandler: Send + Sync { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box> + Send + '_>, overwrite: bool, ) -> DeltaResult<()>; } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 1b359f59d..de407d43e 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -1,12 +1,11 @@ use std::collections::HashMap; use std::iter; -use std::mem; use std::sync::{Arc, LazyLock}; use std::time::{SystemTime, UNIX_EPOCH}; -use crate::actions::get_log_commit_info_schema; use crate::actions::schemas::{GetNullableContainerStructField, GetStructField}; -use crate::actions::{ADD_NAME, COMMIT_INFO_NAME}; +use crate::actions::COMMIT_INFO_NAME; +use crate::actions::{get_log_add_schema, get_log_commit_info_schema}; use crate::error::Error; use crate::expressions::{column_expr, ColumnName, Scalar, StructData}; use crate::path::ParsedLogPath; @@ -57,7 +56,7 @@ pub struct Transaction { read_snapshot: Arc, operation: Option, commit_info: Option>, - write_metadata: Box> + Send>, + write_metadata: Vec>, } impl std::fmt::Debug for Transaction { @@ -82,7 +81,7 @@ impl Transaction { read_snapshot: snapshot.into(), operation: None, commit_info: None, - write_metadata: Box::new(std::iter::empty()), + write_metadata: vec![], } } @@ -103,7 +102,7 @@ impl Transaction { // TODO consider IntoIterator so we can have multiple write_metadata iterators (and return // self in the conflict case for retries) - let adds = generate_adds(engine, self.write_metadata); + let adds = generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref())); let actions = chain(actions, adds); // step two: set new commit version (current_version + 1) and path to write @@ -115,8 +114,7 @@ impl Transaction { let json_handler = engine.get_json_handler(); match json_handler.write_json_file(&commit_path.location, Box::new(actions), false) { Ok(()) => Ok(CommitResult::Committed(commit_version)), - // FIXME - // Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), Err(e) => Err(e), } } @@ -167,6 +165,9 @@ impl Transaction { /// Get the write context for this transaction. At the moment, this is constant for the whole /// transaction. + // Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure + // that engines cannot call this method after a metadata change, since the write context could + // have invalid metadata. pub fn write_context(&self) -> WriteContext { let target_dir = self.read_snapshot.table_root(); let snapshot_schema = self.read_snapshot.schema(); @@ -181,30 +182,23 @@ impl Transaction { } /// Add write metadata about files to include in the transaction. This API can be called - /// multiple times to add multiple iterators. + /// multiple times to add multiple batches. /// - /// TODO what is expected schema for the batches? - pub fn add_write_metadata( - &mut self, - data: Box> + Send>, - ) { - let write_metadata = mem::replace(&mut self.write_metadata, Box::new(std::iter::empty())); - self.write_metadata = Box::new(chain(write_metadata, data)); + /// The expected schema for the write metadata is given by [`get_write_metadata_schema`]. + pub fn add_write_metadata(&mut self, data: Box) { + self.write_metadata.push(data); } } -// this does something similar to adding top-level 'commitInfo' named struct. we should unify. -fn generate_adds( +// convert write_metadata into add actions using an expression to transform the data in a single +// pass +fn generate_adds<'a>( engine: &dyn Engine, - write_metadata: Box> + Send>, -) -> Box> + Send> { + write_metadata: impl Iterator + Send + 'a, +) -> Box> + Send + 'a> { let expression_handler = engine.get_expression_handler(); let write_metadata_schema = get_write_metadata_schema(); - let log_schema: DataType = DataType::struct_type(vec![StructField::new( - ADD_NAME, - write_metadata_schema.clone(), - true, - )]); + let log_schema = get_log_add_schema(); Box::new(write_metadata.map(move |write_metadata_batch| { let adds_expr = Expression::struct_from([Expression::struct_from( @@ -215,22 +209,23 @@ fn generate_adds( let adds_evaluator = expression_handler.get_evaluator( write_metadata_schema.clone(), adds_expr, - log_schema.clone(), + log_schema.clone().into(), ); adds_evaluator - .evaluate(write_metadata_batch.as_ref()) + .evaluate(write_metadata_batch) .expect("fixme") })) } + /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to /// write table data. /// /// [`Transaction`]: struct.Transaction.html pub struct WriteContext { - pub target_dir: Url, - pub schema: SchemaRef, - pub partition_cols: Vec, - pub logical_to_physical: Expression, + target_dir: Url, + schema: SchemaRef, + partition_cols: Vec, + logical_to_physical: Expression, } impl WriteContext { @@ -247,6 +242,22 @@ impl WriteContext { logical_to_physical, } } + + pub fn target_dir(&self) -> &Url { + &self.target_dir + } + + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + pub fn partition_cols(&self) -> &[String] { + &self.partition_cols + } + + pub fn logical_to_physical(&self) -> &Expression { + &self.logical_to_physical + } } /// Result after committing a transaction. If 'committed', the version is the new version written diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index e66a5d416..47c1976f1 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -343,7 +343,7 @@ async fn test_append() -> Result<(), Box> { let parquet_handler = &engine.parquet; parquet_handler .write_parquet_file( - &write_context.target_dir, + write_context.target_dir(), data.expect("FIXME"), std::collections::HashMap::new(), true, @@ -377,7 +377,10 @@ async fn test_append() -> Result<(), Box> { .await .into_iter() .map(|data| data.unwrap()); - txn.add_write_metadata(Box::new(write_metadata)); + + for meta in write_metadata { + txn.add_write_metadata(meta); + } // commit! txn.commit(engine.as_ref())?; @@ -526,7 +529,7 @@ async fn test_append_partitioned() -> Result<(), Box> { let parquet_handler = &engine.parquet; parquet_handler .write_parquet_file( - &write_context.target_dir, + write_context.target_dir(), data.expect("FIXME"), std::collections::HashMap::from([( partition_col.to_string(), @@ -544,7 +547,10 @@ async fn test_append_partitioned() -> Result<(), Box> { .await .into_iter() .map(|data| data.unwrap()); - txn.add_write_metadata(Box::new(write_metadata)); + + for meta in write_metadata { + txn.add_write_metadata(meta); + } // commit! txn.commit(engine.as_ref())?; From 8f1a446e991f2eefe8557439ffc30dfe2d50306a Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 29 Oct 2024 16:16:40 -0700 Subject: [PATCH 08/23] modify write_json_files to take iterator of DeltaResult --- kernel/src/engine/arrow_utils.rs | 6 +++--- kernel/src/engine/default/json.rs | 2 +- kernel/src/engine/sync/json.rs | 6 +++--- kernel/src/lib.rs | 2 +- kernel/src/transaction.rs | 16 +++++----------- 5 files changed, 13 insertions(+), 19 deletions(-) diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index d8daba774..dea2cc9fd 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -665,11 +665,11 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR /// serialize an arrow RecordBatch to a JSON string by appending to a buffer. // TODO (zach): this should stream data to the JSON writer and output an iterator. pub(crate) fn to_json_bytes( - data: impl Iterator> + Send, + data: impl Iterator>> + Send, ) -> DeltaResult> { let mut writer = LineDelimitedWriter::new(Vec::new()); for chunk in data.into_iter() { - let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?; + let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?; let record_batch = arrow_data.record_batch(); writer.write(record_batch)?; } @@ -1436,7 +1436,7 @@ mod tests { vec![Arc::new(StringArray::from(vec!["string1", "string2"]))], )?; let data: Box = Box::new(ArrowEngineData::new(data)); - let json = to_json_bytes(Box::new(std::iter::once(data)))?; + let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?; assert_eq!( json, "{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes() diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index ab108b5b3..1d8aa3058 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -96,7 +96,7 @@ impl JsonHandler for DefaultJsonHandler { fn write_json_file( &self, path: &Url, - data: Box> + Send + '_>, + data: Box>> + Send + '_>, _overwrite: bool, ) -> DeltaResult<()> { let buffer = to_json_bytes(data)?; diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index 1354fcf6c..3d33b1025 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler { fn write_json_file( &self, path: &Url, - data: Box> + Send + '_>, + data: Box>> + Send + '_>, _overwrite: bool, ) -> DeltaResult<()> { let path = path @@ -120,10 +120,10 @@ mod tests { let url = Url::from_file_path(path.clone()).unwrap(); handler - .write_json_file(&url, Box::new(std::iter::once(data)), false) + .write_json_file(&url, Box::new(std::iter::once(Ok(data))), false) .expect("write json file"); assert!(matches!( - handler.write_json_file(&url, Box::new(std::iter::once(empty)), false), + handler.write_json_file(&url, Box::new(std::iter::once(Ok(empty))), false), Err(Error::FileAlreadyExists(_)) )); diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 84889a122..2213eb372 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -244,7 +244,7 @@ pub trait JsonHandler: Send + Sync { fn write_json_file( &self, path: &Url, - data: Box> + Send + '_>, + data: Box>> + Send + '_>, overwrite: bool, ) -> DeltaResult<()>; } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index de407d43e..91e04510e 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -89,7 +89,6 @@ impl Transaction { /// will include the failed transaction in case of a conflict so the user can retry. pub fn commit(self, engine: &dyn Engine) -> DeltaResult { // step one: construct the iterator of actions we want to commit - // note: only support commit_info right now (and it's required) let engine_commit_info = self .commit_info .as_ref() @@ -98,10 +97,7 @@ impl Transaction { engine, self.operation.as_deref(), engine_commit_info.as_ref(), - )?)); - - // TODO consider IntoIterator so we can have multiple write_metadata iterators (and return - // self in the conflict case for retries) + ))); let adds = generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref())); let actions = chain(actions, adds); @@ -144,8 +140,8 @@ impl Transaction { self } - // Generate the logical-to-physical transform expression for this transaction. At the moment, - // this is a transaction-wide expression. + // Generate the logical-to-physical transform expression which must be evaluated on every data + // chunk before writing. At the moment, this is a transaction-wide expression. fn generate_logical_to_physical(&self) -> Expression { // for now, we just pass through all the columns except partition columns. // note this is _incorrect_ if table config deems we need partition columns. @@ -195,7 +191,7 @@ impl Transaction { fn generate_adds<'a>( engine: &dyn Engine, write_metadata: impl Iterator + Send + 'a, -) -> Box> + Send + 'a> { +) -> Box>> + Send + 'a> { let expression_handler = engine.get_expression_handler(); let write_metadata_schema = get_write_metadata_schema(); let log_schema = get_log_add_schema(); @@ -211,9 +207,7 @@ fn generate_adds<'a>( adds_expr, log_schema.clone().into(), ); - adds_evaluator - .evaluate(write_metadata_batch) - .expect("fixme") + adds_evaluator.evaluate(write_metadata_batch) })) } From b41d2550bcd9e99498b722cdda6b541c2af6b3a1 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 29 Oct 2024 16:20:21 -0700 Subject: [PATCH 09/23] get_parquet_handler --- kernel/src/engine/default/mod.rs | 6 +++++- kernel/tests/write.rs | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 76333c822..41cf47911 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -33,7 +33,7 @@ pub struct DefaultEngine { store: Arc, file_system: Arc>, json: Arc>, - pub parquet: Arc>, // FIXME + parquet: Arc>, expression: Arc, } @@ -108,6 +108,10 @@ impl DefaultEngine { pub fn get_object_store_for_url(&self, _url: &Url) -> Option> { Some(self.store.clone()) } + + pub fn get_parquet_handler(&self) -> Arc> { + self.parquet.clone() + } } impl Engine for DefaultEngine { diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 47c1976f1..e3358c0e2 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -340,8 +340,8 @@ async fn test_append() -> Result<(), Box> { let engine = Arc::clone(&engine); let write_context = Arc::clone(&write_context); async move { - let parquet_handler = &engine.parquet; - parquet_handler + engine + .get_parquet_handler() .write_parquet_file( write_context.target_dir(), data.expect("FIXME"), @@ -526,8 +526,8 @@ async fn test_append_partitioned() -> Result<(), Box> { let engine = Arc::clone(&engine); let write_context = Arc::clone(&write_context); async move { - let parquet_handler = &engine.parquet; - parquet_handler + engine + .get_parquet_handler() .write_parquet_file( write_context.target_dir(), data.expect("FIXME"), From 9838e3560ad543496dd49d9ce3d45cbb76e9d462 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 30 Oct 2024 08:35:06 -0700 Subject: [PATCH 10/23] comments --- kernel/src/engine/default/parquet.rs | 5 +++-- kernel/src/transaction.rs | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index edf7e90db..44e73154d 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -112,7 +112,7 @@ impl DefaultParquetHandler { } // Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet - // metadata. + // metadata (where is a generated UUIDv4). // // Note: after encoding the data as parquet, this issues a PUT followed by a HEAD to storage in // order to obtain metadata about the object just written. @@ -152,7 +152,8 @@ impl DefaultParquetHandler { } /// Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet - /// metadata as an EngineData batch which matches the [write metadata] schema + /// metadata as an EngineData batch which matches the [write metadata] schema (where is + /// a generated UUIDv4). /// /// [write metadata]: crate::transaction::get_write_metadata_schema pub async fn write_parquet_file( diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 91e04510e..551a5497d 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -19,7 +19,6 @@ use url::Url; const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); const UNKNOWN_OPERATION: &str = "UNKNOWN"; -// FIXME: should be a projection of LOG_SCHEMA? pub(crate) static WRITE_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { StructType::new(vec![ ::get_struct_field("path"), From bdea457deb303c47e9dfeee8f3d44a149d56df48 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 30 Oct 2024 22:22:08 -0700 Subject: [PATCH 11/23] address comments --- kernel/src/engine/default/parquet.rs | 15 ++-- kernel/src/transaction.rs | 25 +++--- kernel/tests/common/mod.rs | 40 +++++++++- kernel/tests/write.rs | 111 ++++++++------------------- 4 files changed, 88 insertions(+), 103 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 44e73154d..9a6f33c50 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -37,22 +37,22 @@ pub struct DefaultParquetHandler { /// Metadata of a parquet file, currently just includes the file metadata but will expand to /// include file statistics and other metadata in the future. #[derive(Debug)] -pub struct ParquetMetadata { +pub struct ParquetWriteMetadata { file_meta: FileMeta, } -impl ParquetMetadata { +impl ParquetWriteMetadata { pub fn new(file_meta: FileMeta) -> Self { Self { file_meta } } // convert ParquetMetadata into a record batch which matches the 'write_metadata' schema - fn create_write_metadata( + fn as_record_batch( &self, partition_values: HashMap, data_change: bool, ) -> DeltaResult> { - let ParquetMetadata { file_meta } = self; + let ParquetWriteMetadata { file_meta } = self; let FileMeta { location, last_modified, @@ -120,7 +120,7 @@ impl DefaultParquetHandler { &self, path: &url::Url, data: Box, - ) -> DeltaResult { + ) -> DeltaResult { let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?; let record_batch = batch.record_batch(); @@ -148,7 +148,7 @@ impl DefaultParquetHandler { } let file_meta = FileMeta::new(path, modification_time, size); - Ok(ParquetMetadata::new(file_meta)) + Ok(ParquetWriteMetadata::new(file_meta)) } /// Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet @@ -164,8 +164,7 @@ impl DefaultParquetHandler { data_change: bool, ) -> DeltaResult> { let parquet_metadata = self.write_parquet(path, data).await?; - let write_metadata = - parquet_metadata.create_write_metadata(partition_values, data_change)?; + let write_metadata = parquet_metadata.as_record_batch(partition_values, data_change)?; Ok(write_metadata) } } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 551a5497d..7aa08b6cb 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -92,13 +92,13 @@ impl Transaction { .commit_info .as_ref() .ok_or_else(|| Error::MissingCommitInfo)?; - let actions = Box::new(iter::once(generate_commit_info( + let commit_info = generate_commit_info( engine, self.operation.as_deref(), engine_commit_info.as_ref(), - ))); + ); let adds = generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref())); - let actions = chain(actions, adds); + let actions = chain(iter::once(commit_info), adds); // step two: set new commit version (current_version + 1) and path to write let commit_version = self.read_snapshot.version() + 1; @@ -153,7 +153,8 @@ impl Transaction { { None } else { - Some(Expression::Column(ColumnName::new(iter::once(f.name())))) + let col_name = ColumnName::new([f.name()]); + Some(col_name.into()) } })) } @@ -163,7 +164,7 @@ impl Transaction { // Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure // that engines cannot call this method after a metadata change, since the write context could // have invalid metadata. - pub fn write_context(&self) -> WriteContext { + pub fn get_write_context(&self) -> WriteContext { let target_dir = self.read_snapshot.table_root(); let snapshot_schema = self.read_snapshot.schema(); let partition_cols = self.read_snapshot.metadata().partition_columns.clone(); @@ -179,9 +180,9 @@ impl Transaction { /// Add write metadata about files to include in the transaction. This API can be called /// multiple times to add multiple batches. /// - /// The expected schema for the write metadata is given by [`get_write_metadata_schema`]. - pub fn add_write_metadata(&mut self, data: Box) { - self.write_metadata.push(data); + /// The expected schema for `write_metadata` is given by [`get_write_metadata_schema`]. + pub fn add_write_metadata(&mut self, write_metadata: Box) { + self.write_metadata.push(write_metadata); } } @@ -190,16 +191,16 @@ impl Transaction { fn generate_adds<'a>( engine: &dyn Engine, write_metadata: impl Iterator + Send + 'a, -) -> Box>> + Send + 'a> { +) -> impl Iterator>> + Send + 'a { let expression_handler = engine.get_expression_handler(); let write_metadata_schema = get_write_metadata_schema(); let log_schema = get_log_add_schema(); - Box::new(write_metadata.map(move |write_metadata_batch| { + write_metadata.map(move |write_metadata_batch| { let adds_expr = Expression::struct_from([Expression::struct_from( write_metadata_schema .fields() - .map(|f| Expression::Column(ColumnName::new(iter::once(f.name())))), + .map(|f| ColumnName::new([f.name()]).into()), )]); let adds_evaluator = expression_handler.get_evaluator( write_metadata_schema.clone(), @@ -207,7 +208,7 @@ fn generate_adds<'a>( log_schema.clone().into(), ); adds_evaluator.evaluate(write_metadata_batch) - })) + }) } /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index c43a7df5e..457146664 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -1,7 +1,6 @@ use crate::ArrowEngineData; use arrow::record_batch::RecordBatch; -use delta_kernel::DeltaResult; -use delta_kernel::EngineData; +use delta_kernel::{DeltaResult, Engine, EngineData, Table}; pub(crate) fn to_arrow(data: Box) -> DeltaResult { Ok(data @@ -10,3 +9,40 @@ pub(crate) fn to_arrow(data: Box) -> DeltaResult { .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? .into()) } + +// going to unify across read/write +pub(crate) fn test_read( + expected: &ArrowEngineData, + table: &Table, + engine: &impl Engine, +) -> Result<(), Box> { + let snapshot = table.snapshot(engine, None)?; + let scan = snapshot.into_scan_builder().build()?; + let actual = scan.execute(engine)?; + let batches: Vec = actual + .into_iter() + .map(|res| { + let data = res.unwrap().raw_data.unwrap(); + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .unwrap() + .into(); + record_batch + }) + .collect(); + + let formatted = arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string(); + + let expected = arrow::util::pretty::pretty_format_batches(&[expected.record_batch().clone()]) + .unwrap() + .to_string(); + + println!("actual:\n{formatted}"); + println!("expected:\n{expected}"); + assert_eq!(formatted, expected); + + Ok(()) +} diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index e3358c0e2..5a4bc9d3c 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -5,6 +5,7 @@ use arrow::record_batch::RecordBatch; use arrow_schema::Schema as ArrowSchema; use arrow_schema::{DataType as ArrowDataType, Field}; use itertools::Itertools; +use object_store::local::LocalFileSystem; use object_store::memory::InMemory; use object_store::path::Path; use object_store::ObjectStore; @@ -17,33 +18,36 @@ use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; use delta_kernel::Error as KernelError; -use delta_kernel::{DeltaResult, Engine, Table}; +use delta_kernel::{DeltaResult, Table}; -// setup default engine with in-memory object store. +mod common; +use common::test_read; + +// setup default engine with in-memory (=true) or local fs (=false) object store. fn setup( table_name: &str, + in_memory: bool, ) -> ( Arc, DefaultEngine, Url, ) { - let table_root_path = Path::from(format!("/{table_name}")); - let url = Url::parse(&format!("memory:///{}/", table_root_path)).unwrap(); - let storage = Arc::new(InMemory::new()); - // uncomment for persisting to local filesystem - // use object_store::local::LocalFileSystem; - // let table_root_path = Path::from(format!("users/zach.schuermann/Desktop/kernel_tests/{table_name}")); - // let url = Url::parse(&format!("file:///users/zach.schuermann/Desktop/kernel_tests/{}/", table_root_path)).unwrap(); - // let storage = Arc::new(LocalFileSystem::new()); - ( - storage.clone(), - DefaultEngine::new( - storage, - table_root_path, - Arc::new(TokioBackgroundExecutor::new()), - ), - url, - ) + let (storage, base_path, base_url): (Arc, &str, &str) = if in_memory { + (Arc::new(InMemory::new()), "/", "memory:///") + } else { + ( + Arc::new(LocalFileSystem::new()), + "./kernel_write_tests/", + "file://", + ) + }; + + let table_root_path = Path::from(format!("{base_path}{table_name}")); + let url = Url::parse(&format!("{base_url}{table_root_path}/")).unwrap(); + let executor = Arc::new(TokioBackgroundExecutor::new()); + let engine = DefaultEngine::new(Arc::clone(&storage), table_root_path, executor); + + (storage, engine, url) } // we provide this table creation function since we only do appends to existing tables for now. @@ -139,7 +143,7 @@ async fn test_commit_info() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' let schema = Arc::new(StructType::new(vec![StructField::new( @@ -193,7 +197,7 @@ async fn test_empty_commit() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' let schema = Arc::new(StructType::new(vec![StructField::new( @@ -216,7 +220,7 @@ async fn test_invalid_commit_info() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' let schema = Arc::new(StructType::new(vec![StructField::new( @@ -266,48 +270,12 @@ async fn test_invalid_commit_info() -> Result<(), Box> { Ok(()) } -fn test_read( - expected: &ArrowEngineData, - table: &Table, - engine: &impl Engine, -) -> Result<(), Box> { - let snapshot = table.snapshot(engine, None)?; - let scan = snapshot.into_scan_builder().build()?; - let actual = scan.execute(engine)?; - let batches: Vec = actual - .into_iter() - .map(|res| { - let data = res.unwrap().raw_data.unwrap(); - let record_batch: RecordBatch = data - .into_any() - .downcast::() - .unwrap() - .into(); - record_batch - }) - .collect(); - - let formatted = arrow::util::pretty::pretty_format_batches(&batches) - .unwrap() - .to_string(); - - let expected = arrow::util::pretty::pretty_format_batches(&[expected.record_batch().clone()]) - .unwrap() - .to_string(); - - println!("actual:\n{formatted}"); - println!("expected:\n{expected}"); - assert_eq!(formatted, expected); - - Ok(()) -} - #[tokio::test] async fn test_append() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' let schema = Arc::new(StructType::new(vec![StructField::new( @@ -334,7 +302,7 @@ async fn test_append() -> Result<(), Box> { // write data out by spawning async tasks to simulate executors let engine = Arc::new(engine); - let write_context = Arc::new(txn.write_context()); + let write_context = Arc::new(txn.get_write_context()); let tasks = append_data.into_iter().map(|data| { tokio::task::spawn({ let engine = Arc::clone(&engine); @@ -354,25 +322,6 @@ async fn test_append() -> Result<(), Box> { }) }); - // FIXME this is collecting to vec - //let write_metadata: Vec = futures::future::join_all(tasks) - // .await - // .into_iter() - // .map(|data| { - // let data = data.unwrap(); - // data.into_any() - // .downcast::() - // .unwrap() - // .into() - // }) - // .collect(); - - //txn.add_write_metadata(Box::new( - // write_metadata - // .into_iter() - // .map(|data| Box::new(ArrowEngineData::new(data))), - //)); - let write_metadata = futures::future::join_all(tasks) .await .into_iter() @@ -477,7 +426,7 @@ async fn test_append_partitioned() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); let partition_col = "partition"; // create a simple partitioned table: one int column named 'number', partitioned by string @@ -517,7 +466,7 @@ async fn test_append_partitioned() -> Result<(), Box> { // write data out by spawning async tasks to simulate executors let engine = Arc::new(engine); - let write_context = Arc::new(txn.write_context()); + let write_context = Arc::new(txn.get_write_context()); let tasks = append_data .into_iter() .zip(partition_vals) From 51e7cfbcfe9c1643574969010b50cb64ec95a852 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 31 Oct 2024 09:45:27 -0700 Subject: [PATCH 12/23] unify read test code in common --- kernel/tests/common/mod.rs | 46 +++++++++++++++++++++----------------- kernel/tests/read.rs | 20 +++-------------- 2 files changed, 29 insertions(+), 37 deletions(-) diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index 457146664..0dfb6f3cb 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -1,5 +1,10 @@ -use crate::ArrowEngineData; +use arrow::compute::filter_record_batch; use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use itertools::Itertools; + +use crate::ArrowEngineData; +use delta_kernel::scan::Scan; use delta_kernel::{DeltaResult, Engine, EngineData, Table}; pub(crate) fn to_arrow(data: Box) -> DeltaResult { @@ -10,7 +15,6 @@ pub(crate) fn to_arrow(data: Box) -> DeltaResult { .into()) } -// going to unify across read/write pub(crate) fn test_read( expected: &ArrowEngineData, table: &Table, @@ -18,25 +22,10 @@ pub(crate) fn test_read( ) -> Result<(), Box> { let snapshot = table.snapshot(engine, None)?; let scan = snapshot.into_scan_builder().build()?; - let actual = scan.execute(engine)?; - let batches: Vec = actual - .into_iter() - .map(|res| { - let data = res.unwrap().raw_data.unwrap(); - let record_batch: RecordBatch = data - .into_any() - .downcast::() - .unwrap() - .into(); - record_batch - }) - .collect(); - - let formatted = arrow::util::pretty::pretty_format_batches(&batches) - .unwrap() - .to_string(); + let batches = read_scan(&scan, engine)?; + let formatted = pretty_format_batches(&batches).unwrap().to_string(); - let expected = arrow::util::pretty::pretty_format_batches(&[expected.record_batch().clone()]) + let expected = pretty_format_batches(&[expected.record_batch().clone()]) .unwrap() .to_string(); @@ -46,3 +35,20 @@ pub(crate) fn test_read( Ok(()) } + +pub(crate) fn read_scan(scan: &Scan, engine: &dyn Engine) -> DeltaResult> { + let scan_results = scan.execute(engine)?; + scan_results + .map(|scan_result| -> DeltaResult<_> { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch = to_arrow(data)?; + if let Some(mask) = mask { + Ok(filter_record_batch(&record_batch, &mask.into())?) + } else { + Ok(record_batch) + } + }) + .try_collect() +} diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 92bf70314..ada0d22e6 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -17,15 +17,14 @@ use delta_kernel::expressions::{column_expr, BinaryOperator, Expression}; use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats}; use delta_kernel::scan::{transform_to_logical, Scan}; use delta_kernel::schema::Schema; -use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table}; -use itertools::Itertools; +use delta_kernel::{Engine, EngineData, FileMeta, Table}; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::arrow_writer::ArrowWriter; use parquet::file::properties::WriterProperties; use url::Url; mod common; -use common::to_arrow; +use common::{read_scan, to_arrow}; const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet"; const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet"; @@ -393,20 +392,7 @@ fn read_with_execute( expected: &[String], ) -> Result<(), Box> { let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?); - let scan_results = scan.execute(engine)?; - let batches: Vec = scan_results - .map(|scan_result| -> DeltaResult<_> { - let scan_result = scan_result?; - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let record_batch = to_arrow(data)?; - if let Some(mask) = mask { - Ok(filter_record_batch(&record_batch, &mask.into())?) - } else { - Ok(record_batch) - } - }) - .try_collect()?; + let batches = read_scan(scan, engine)?; if expected.is_empty() { assert_eq!(batches.len(), 0); From c67862e3172fa6d4b8495eb78b9dab575073c6ef Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 31 Oct 2024 11:24:41 -0700 Subject: [PATCH 13/23] make a set_value helper for tests --- kernel/tests/write.rs | 85 +++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 52 deletions(-) diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 5a4bc9d3c..0080f0d92 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -270,6 +270,29 @@ async fn test_invalid_commit_info() -> Result<(), Box> { Ok(()) } +// set the value at .-separated `path` in `values` to `new_value` at `index` +fn set_value( + mut value: &mut serde_json::Value, + path: &str, + new_value: serde_json::Value, +) -> Result<(), Box> { + let parts: Vec<_> = path.split('.').collect(); + + for name in parts.iter().take(parts.len() - 1) { + value = value + .get_mut(*name) + .ok_or_else(|| format!("key '{name}' not found"))?; + } + + let last_key = parts.last().ok_or("empty path")?; + value + .as_object_mut() + .ok_or("expected a JSON object")? + .insert(last_key.to_string(), new_value); + + Ok(()) +} + #[tokio::test] async fn test_append() -> Result<(), Box> { // setup tracing @@ -344,32 +367,11 @@ async fn test_append() -> Result<(), Box> { .into_iter::() .try_collect()?; - // FIXME - *parsed_commits[0] - .get_mut("commitInfo") - .unwrap() - .get_mut("timestamp") - .unwrap() = serde_json::Value::Number(0.into()); - *parsed_commits[1] - .get_mut("add") - .unwrap() - .get_mut("modificationTime") - .unwrap() = serde_json::Value::Number(0.into()); - *parsed_commits[1] - .get_mut("add") - .unwrap() - .get_mut("path") - .unwrap() = serde_json::Value::String("first.parquet".to_string()); - *parsed_commits[2] - .get_mut("add") - .unwrap() - .get_mut("modificationTime") - .unwrap() = serde_json::Value::Number(0.into()); - *parsed_commits[2] - .get_mut("add") - .unwrap() - .get_mut("path") - .unwrap() = serde_json::Value::String("second.parquet".to_string()); + set_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?; + set_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?; + set_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[2], "add.path", json!("second.parquet"))?; let expected_commit = vec![ json!({ @@ -514,32 +516,11 @@ async fn test_append_partitioned() -> Result<(), Box> { .into_iter::() .try_collect()?; - // FIXME - *parsed_commits[0] - .get_mut("commitInfo") - .unwrap() - .get_mut("timestamp") - .unwrap() = serde_json::Value::Number(0.into()); - *parsed_commits[1] - .get_mut("add") - .unwrap() - .get_mut("modificationTime") - .unwrap() = serde_json::Value::Number(0.into()); - *parsed_commits[1] - .get_mut("add") - .unwrap() - .get_mut("path") - .unwrap() = serde_json::Value::String("first.parquet".to_string()); - *parsed_commits[2] - .get_mut("add") - .unwrap() - .get_mut("modificationTime") - .unwrap() = serde_json::Value::Number(0.into()); - *parsed_commits[2] - .get_mut("add") - .unwrap() - .get_mut("path") - .unwrap() = serde_json::Value::String("second.parquet".to_string()); + set_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?; + set_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?; + set_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[2], "add.path", json!("second.parquet"))?; let expected_commit = vec![ json!({ From b2f8414f26055047d5ac299d8ce7176223518ce4 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 31 Oct 2024 18:48:02 -0700 Subject: [PATCH 14/23] add invalid schema test --- kernel/src/engine/default/mod.rs | 33 +++++++- kernel/src/transaction.rs | 9 -- kernel/tests/write.rs | 137 +++++++++++++++++++++---------- 3 files changed, 122 insertions(+), 57 deletions(-) diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 41cf47911..dca4398fc 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -7,6 +7,7 @@ //! the [executor] module. use std::sync::Arc; +use std::collections::HashMap; use self::storage::parse_url_opts; use object_store::{path::Path, DynObjectStore}; @@ -16,9 +17,13 @@ use self::executor::TaskExecutor; use self::filesystem::ObjectStoreFileSystemClient; use self::json::DefaultJsonHandler; use self::parquet::DefaultParquetHandler; +use super::arrow_data::ArrowEngineData; use super::arrow_expression::ArrowExpressionHandler; +use crate::schema::Schema; +use crate::transaction::WriteContext; use crate::{ - DeltaResult, Engine, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler, + DeltaResult, Engine, EngineData, ExpressionHandler, FileSystemClient, JsonHandler, + ParquetHandler, }; pub mod executor; @@ -109,8 +114,30 @@ impl DefaultEngine { Some(self.store.clone()) } - pub fn get_parquet_handler(&self) -> Arc> { - self.parquet.clone() + pub async fn write_parquet( + &self, + data: &ArrowEngineData, + write_context: &WriteContext, + partition_values: HashMap, + data_change: bool, + ) -> DeltaResult> { + let transform = write_context.logical_to_physical(); + let input_schema: Schema = data.record_batch().schema().try_into()?; + let output_schema = write_context.schema(); + let logical_to_physical_expr = self.get_expression_handler().get_evaluator( + input_schema.into(), + transform.clone(), + output_schema.clone().into(), + ); + let physical_data = logical_to_physical_expr.evaluate(data)?; + self.parquet + .write_parquet_file( + write_context.target_dir(), + physical_data, + partition_values, + data_change + ) + .await } } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 7aa08b6cb..0fac2c80d 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -167,12 +167,10 @@ impl Transaction { pub fn get_write_context(&self) -> WriteContext { let target_dir = self.read_snapshot.table_root(); let snapshot_schema = self.read_snapshot.schema(); - let partition_cols = self.read_snapshot.metadata().partition_columns.clone(); let logical_to_physical = self.generate_logical_to_physical(); WriteContext::new( target_dir.clone(), Arc::new(snapshot_schema.clone()), - partition_cols, logical_to_physical, ) } @@ -218,7 +216,6 @@ fn generate_adds<'a>( pub struct WriteContext { target_dir: Url, schema: SchemaRef, - partition_cols: Vec, logical_to_physical: Expression, } @@ -226,13 +223,11 @@ impl WriteContext { fn new( target_dir: Url, schema: SchemaRef, - partition_cols: Vec, logical_to_physical: Expression, ) -> Self { WriteContext { target_dir, schema, - partition_cols, logical_to_physical, } } @@ -245,10 +240,6 @@ impl WriteContext { &self.schema } - pub fn partition_cols(&self) -> &[String] { - &self.partition_cols - } - pub fn logical_to_physical(&self) -> &Expression { &self.logical_to_physical } diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 0080f0d92..2172340ed 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use arrow::array::{Int32Array, StringArray}; @@ -327,31 +328,24 @@ async fn test_append() -> Result<(), Box> { let engine = Arc::new(engine); let write_context = Arc::new(txn.get_write_context()); let tasks = append_data.into_iter().map(|data| { - tokio::task::spawn({ - let engine = Arc::clone(&engine); - let write_context = Arc::clone(&write_context); - async move { - engine - .get_parquet_handler() - .write_parquet_file( - write_context.target_dir(), - data.expect("FIXME"), - std::collections::HashMap::new(), - true, - ) - .await - .expect("FIXME") - } + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::new(), + true, + ) + .await }) }); - let write_metadata = futures::future::join_all(tasks) - .await - .into_iter() - .map(|data| data.unwrap()); - + let write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); for meta in write_metadata { - txn.add_write_metadata(meta); + txn.add_write_metadata(meta?); } // commit! @@ -473,34 +467,24 @@ async fn test_append_partitioned() -> Result<(), Box> { .into_iter() .zip(partition_vals) .map(|(data, partition_val)| { - tokio::task::spawn({ - let engine = Arc::clone(&engine); - let write_context = Arc::clone(&write_context); - async move { - engine - .get_parquet_handler() - .write_parquet_file( - write_context.target_dir(), - data.expect("FIXME"), - std::collections::HashMap::from([( - partition_col.to_string(), - partition_val.to_string(), - )]), - true, - ) - .await - .expect("FIXME") - } + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::from([(partition_col.to_string(), partition_val.to_string())]), + true, + ) + .await }) }); - let write_metadata = futures::future::join_all(tasks) - .await - .into_iter() - .map(|data| data.unwrap()); - + let write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); for meta in write_metadata { - txn.add_write_metadata(meta); + txn.add_write_metadata(meta?); } // commit! @@ -576,3 +560,66 @@ async fn test_append_partitioned() -> Result<(), Box> { )?; Ok(()) } + +#[tokio::test] +async fn test_append_invalid_schema() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + + // create a simple table: one int column named 'number' + let table_schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + // incompatible data schema: one string column named 'string' + let data_schema = Arc::new(StructType::new(vec![StructField::new( + "string", + DataType::STRING, + true, + )])); + let table = create_table(store.clone(), table_location, table_schema.clone(), &[]).await?; + + let commit_info = new_commit_info()?; + + let txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [["a", "b"], ["c", "d"]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(data_schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::StringArray::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.get_write_context()); + let tasks = append_data.into_iter().map(|data| { + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::new(), + true, + ) + .await + }) + }); + + let mut write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); + assert!(write_metadata.all(|res| matches!( + res, + Err(KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_))) + ))); + Ok(()) +} From 9a2ae28909029c9bb556c94cc7d8a676fce2509f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 31 Oct 2024 18:51:21 -0700 Subject: [PATCH 15/23] fmt --- kernel/src/engine/default/mod.rs | 4 ++-- kernel/src/transaction.rs | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index dca4398fc..d89cf29cd 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -6,8 +6,8 @@ //! a separate thread pool, provided by the [`TaskExecutor`] trait. Read more in //! the [executor] module. -use std::sync::Arc; use std::collections::HashMap; +use std::sync::Arc; use self::storage::parse_url_opts; use object_store::{path::Path, DynObjectStore}; @@ -135,7 +135,7 @@ impl DefaultEngine { write_context.target_dir(), physical_data, partition_values, - data_change + data_change, ) .await } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 0fac2c80d..e3e39d462 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -220,11 +220,7 @@ pub struct WriteContext { } impl WriteContext { - fn new( - target_dir: Url, - schema: SchemaRef, - logical_to_physical: Expression, - ) -> Self { + fn new(target_dir: Url, schema: SchemaRef, logical_to_physical: Expression) -> Self { WriteContext { target_dir, schema, From edd6c4b44a714afe02c75ed627abf3598d8ce36f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 6 Nov 2024 16:28:39 -0800 Subject: [PATCH 16/23] fix naming, add test --- kernel/src/engine/default/parquet.rs | 93 +++++++++++++++++++++------- 1 file changed, 72 insertions(+), 21 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 9a6f33c50..71e92f653 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -37,11 +37,11 @@ pub struct DefaultParquetHandler { /// Metadata of a parquet file, currently just includes the file metadata but will expand to /// include file statistics and other metadata in the future. #[derive(Debug)] -pub struct ParquetWriteMetadata { +pub struct DataFileMetadata { file_meta: FileMeta, } -impl ParquetWriteMetadata { +impl DataFileMetadata { pub fn new(file_meta: FileMeta) -> Self { Self { file_meta } } @@ -49,15 +49,17 @@ impl ParquetWriteMetadata { // convert ParquetMetadata into a record batch which matches the 'write_metadata' schema fn as_record_batch( &self, - partition_values: HashMap, + partition_values: &HashMap, data_change: bool, ) -> DeltaResult> { - let ParquetWriteMetadata { file_meta } = self; - let FileMeta { - location, - last_modified, - size, - } = file_meta; + let DataFileMetadata { + file_meta: + FileMeta { + location, + last_modified, + size, + }, + } = self; let write_metadata_schema = crate::transaction::get_write_metadata_schema(); // create the record batch of the write metadata @@ -70,15 +72,11 @@ impl ParquetWriteMetadata { value: "value".to_string(), }; let mut builder = MapBuilder::new(Some(names), key_builder, val_builder); - if partition_values.is_empty() { - builder.append(true).unwrap(); - } else { - for (k, v) in partition_values { - builder.keys().append_value(&k); - builder.values().append_value(&v); - builder.append(true).unwrap(); - } + for (k, v) in partition_values { + builder.keys().append_value(&k); + builder.values().append_value(&v); } + builder.append(true).unwrap(); let partitions = Arc::new(builder.finish()); // this means max size we can write is i64::MAX (~8EB) let size: i64 = (*size) @@ -120,7 +118,7 @@ impl DefaultParquetHandler { &self, path: &url::Url, data: Box, - ) -> DeltaResult { + ) -> DeltaResult { let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?; let record_batch = batch.record_batch(); @@ -148,7 +146,7 @@ impl DefaultParquetHandler { } let file_meta = FileMeta::new(path, modification_time, size); - Ok(ParquetWriteMetadata::new(file_meta)) + Ok(DataFileMetadata::new(file_meta)) } /// Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet @@ -164,7 +162,7 @@ impl DefaultParquetHandler { data_change: bool, ) -> DeltaResult> { let parquet_metadata = self.write_parquet(path, data).await?; - let write_metadata = parquet_metadata.as_record_batch(partition_values, data_change)?; + let write_metadata = parquet_metadata.as_record_batch(&partition_values, data_change)?; Ok(write_metadata) } } @@ -369,6 +367,7 @@ mod tests { use arrow_array::RecordBatch; use object_store::{local::LocalFileSystem, ObjectStore}; + use url::Url; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; @@ -423,7 +422,59 @@ mod tests { } #[test] - fn test_into_write_metadata() {} + fn test_as_record_batch() { + let location = Url::parse("file:///test_url").unwrap(); + let size = 1_000_000; + let last_modified = 10000000000; + let file_metadata = FileMeta::new(location.clone(), last_modified, size as usize); + let data_file_metadata = DataFileMetadata::new(file_metadata); + let partition_values = HashMap::from([ + ("partition1".to_string(), "a".to_string()), + ("partition2".to_string(), "b".to_string()), + ]); + let data_change = true; + let actual = data_file_metadata + .as_record_batch(&partition_values, data_change) + .unwrap(); + let actual = ArrowEngineData::try_from_engine_data(actual).unwrap(); + + let schema = Arc::new( + crate::transaction::get_write_metadata_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let mut partition_values_builder = MapBuilder::new( + Some(MapFieldNames { + entry: "key_value".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + key_builder, + val_builder, + ); + partition_values_builder.keys().append_value("partition1"); + partition_values_builder.values().append_value("a"); + partition_values_builder.keys().append_value("partition2"); + partition_values_builder.values().append_value("b"); + partition_values_builder.append(true).unwrap(); + let partition_values = partition_values_builder.finish(); + let expected = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec![location.to_string()])), + Arc::new(partition_values), + Arc::new(Int64Array::from(vec![size])), + Arc::new(Int64Array::from(vec![last_modified])), + Arc::new(BooleanArray::from(vec![data_change])), + ], + ) + .unwrap(); + + assert_eq!(actual.record_batch(), &expected); + } #[tokio::test] async fn test_write_parquet() {} From 1d4c914f99bbe43bce6110796a32ef670ff6ba96 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 6 Nov 2024 16:29:23 -0800 Subject: [PATCH 17/23] fmt --- kernel/src/engine/default/parquet.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 71e92f653..fd3bbf758 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -73,8 +73,8 @@ impl DataFileMetadata { }; let mut builder = MapBuilder::new(Some(names), key_builder, val_builder); for (k, v) in partition_values { - builder.keys().append_value(&k); - builder.values().append_value(&v); + builder.keys().append_value(k); + builder.values().append_value(v); } builder.append(true).unwrap(); let partitions = Arc::new(builder.finish()); From 92d29a6b8685de6ed3c247245a587cadf4c1e179 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 7 Nov 2024 12:58:37 -0800 Subject: [PATCH 18/23] address comments --- kernel/src/engine/default/parquet.rs | 9 +--- kernel/src/lib.rs | 2 +- kernel/tests/write.rs | 65 ++++++++++++++++++---------- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index fd3bbf758..be4e79e23 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -137,7 +137,7 @@ impl DefaultParquetHandler { .await?; let metadata = self.store.head(&Path::from(path.path())).await?; - let modification_time = metadata.last_modified.timestamp(); + let modification_time = metadata.last_modified.timestamp_millis(); if size != metadata.size { return Err(Error::generic(format!( "Size mismatch after writing parquet file: expected {}, got {}", @@ -428,10 +428,7 @@ mod tests { let last_modified = 10000000000; let file_metadata = FileMeta::new(location.clone(), last_modified, size as usize); let data_file_metadata = DataFileMetadata::new(file_metadata); - let partition_values = HashMap::from([ - ("partition1".to_string(), "a".to_string()), - ("partition2".to_string(), "b".to_string()), - ]); + let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]); let data_change = true; let actual = data_file_metadata .as_record_batch(&partition_values, data_change) @@ -457,8 +454,6 @@ mod tests { ); partition_values_builder.keys().append_value("partition1"); partition_values_builder.values().append_value("a"); - partition_values_builder.keys().append_value("partition2"); - partition_values_builder.values().append_value("b"); partition_values_builder.append(true).unwrap(); let partition_values = partition_values_builder.finish(); let expected = RecordBatch::try_new( diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 2213eb372..374030a48 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -106,7 +106,7 @@ pub type FileDataReadResultIterator = pub struct FileMeta { /// The fully qualified path to the object pub location: Url, - /// The last modified time + /// The last modified time as milliseconds since unix epoch pub last_modified: i64, /// The size in bytes of the object pub size: usize, diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 2172340ed..c779db7e3 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -271,26 +271,41 @@ async fn test_invalid_commit_info() -> Result<(), Box> { Ok(()) } -// set the value at .-separated `path` in `values` to `new_value` at `index` +// check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() +fn check_action_timestamps<'a>( + parsed_commits: impl Iterator, +) -> Result<(), Box> { + // check that timestamps are within 10s of SystemTime::now() + let now: i64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_millis() + .try_into() + .unwrap(); + + parsed_commits.for_each(|commit| { + if let Some(commit_info_ts) = &commit.pointer("/commitInfo/timestamp") { + assert!(now - commit_info_ts.as_i64().unwrap() < 10_000); + } + if let Some(add_ts) = &commit.pointer("/add/modificationTime") { + assert!(now - add_ts.as_i64().unwrap() < 10_000); + } + }); + + Ok(()) +} + +// update `value` at (.-separated) `path` to `new_value` fn set_value( - mut value: &mut serde_json::Value, + value: &mut serde_json::Value, path: &str, new_value: serde_json::Value, ) -> Result<(), Box> { - let parts: Vec<_> = path.split('.').collect(); - - for name in parts.iter().take(parts.len() - 1) { - value = value - .get_mut(*name) - .ok_or_else(|| format!("key '{name}' not found"))?; - } - - let last_key = parts.last().ok_or("empty path")?; - value - .as_object_mut() - .ok_or("expected a JSON object")? - .insert(last_key.to_string(), new_value); - + let mut path_string = path.replace(".", "/"); + path_string.insert_str(0, "/"); + let v = value + .pointer_mut(&path_string) + .ok_or_else(|| format!("key '{path}' not found"))?; + *v = new_value; Ok(()) } @@ -361,6 +376,12 @@ async fn test_append() -> Result<(), Box> { .into_iter::() .try_collect()?; + // check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() + // before we clear them for comparison + check_action_timestamps(parsed_commits.iter())?; + + // set timestamps to 0 and paths to known string values for comparison + // (otherwise timestamps are non-deterministic and paths are random UUIDs) set_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?; set_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?; set_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?; @@ -399,9 +420,6 @@ async fn test_append() -> Result<(), Box> { }), ]; - println!("actual:\n{parsed_commits:#?}"); - println!("expected:\n{expected_commit:#?}"); - assert_eq!(parsed_commits, expected_commit); test_read( @@ -500,6 +518,12 @@ async fn test_append_partitioned() -> Result<(), Box> { .into_iter::() .try_collect()?; + // check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() + // before we clear them for comparison + check_action_timestamps(parsed_commits.iter())?; + + // set timestamps to 0 and paths to known string values for comparison + // (otherwise timestamps are non-deterministic and paths are random UUIDs) set_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?; set_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?; set_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?; @@ -542,9 +566,6 @@ async fn test_append_partitioned() -> Result<(), Box> { }), ]; - println!("actual:\n{parsed_commits:#?}"); - println!("expected:\n{expected_commit:#?}"); - assert_eq!(parsed_commits, expected_commit); test_read( From 467523185b43c04c30bb33b12fa6f49f124a93d6 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 7 Nov 2024 13:49:43 -0800 Subject: [PATCH 19/23] add write_parquet test --- kernel/src/engine/default/parquet.rs | 71 +++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index be4e79e23..a64590454 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -364,9 +364,11 @@ impl FileOpener for PresignedUrlOpener { #[cfg(test)] mod tests { use std::path::PathBuf; + use std::time::{SystemTime, UNIX_EPOCH}; + use arrow_array::array::Array; use arrow_array::RecordBatch; - use object_store::{local::LocalFileSystem, ObjectStore}; + use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; use url::Url; use crate::engine::arrow_data::ArrowEngineData; @@ -472,5 +474,70 @@ mod tests { } #[tokio::test] - async fn test_write_parquet() {} + async fn test_write_parquet() { + let store = Arc::new(InMemory::new()); + let parquet_handler = + DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + let data = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![( + "a", + Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc, + )]) + .unwrap(), + )); + + let write_metadata = parquet_handler + .write_parquet(&Url::parse("memory:///data/").unwrap(), data) + .await + .unwrap(); + + let DataFileMetadata { + file_meta: + ref parquet_file @ FileMeta { + ref location, + last_modified, + size, + }, + } = write_metadata; + let expected_location = Url::parse("memory:///data/").unwrap(); + let expected_size = 497; + + // check that last_modified is within 10s of now + let now: i64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap(); + + let filename = location.path().split('/').last().unwrap(); + assert_eq!(&expected_location.join(filename).unwrap(), location); + assert_eq!(expected_size, size); + assert!(now - last_modified < 10_000); + + // check we can read back + let path = Path::from(location.path()); + let meta = store.head(&path).await.unwrap(); + let reader = ParquetObjectReader::new(store.clone(), meta.clone()); + let physical_schema = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .schema() + .clone(); + + let data: Vec = parquet_handler + .read_parquet_files( + &[parquet_file.clone()], + Arc::new(physical_schema.try_into().unwrap()), + None, + ) + .unwrap() + .map(into_record_batch) + .try_collect() + .unwrap(); + + assert_eq!(data.len(), 1); + assert_eq!(data[0].num_rows(), 3); + } } From a7455f851dcd46915623fb798565c68f0dd9368f Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 7 Nov 2024 14:25:22 -0800 Subject: [PATCH 20/23] fix comment --- kernel/src/engine/default/parquet.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index a64590454..db41f3698 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -34,8 +34,8 @@ pub struct DefaultParquetHandler { readahead: usize, } -/// Metadata of a parquet file, currently just includes the file metadata but will expand to -/// include file statistics and other metadata in the future. +/// Metadata of a data file (typically a parquet file), currently just includes the file metadata +/// but will expand to include file statistics and other metadata in the future. #[derive(Debug)] pub struct DataFileMetadata { file_meta: FileMeta, @@ -46,7 +46,7 @@ impl DataFileMetadata { Self { file_meta } } - // convert ParquetMetadata into a record batch which matches the 'write_metadata' schema + // convert DataFileMetadata into a record batch which matches the 'write_metadata' schema fn as_record_batch( &self, partition_values: &HashMap, From 0044d8ec17455dc335ea16345ebdc9c2831062c5 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 7 Nov 2024 14:34:03 -0800 Subject: [PATCH 21/23] comments, cleanup, test fix --- kernel/tests/common/mod.rs | 4 ++++ kernel/tests/write.rs | 16 +++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index 0dfb6f3cb..c219efd61 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -15,6 +15,8 @@ pub(crate) fn to_arrow(data: Box) -> DeltaResult { .into()) } +// TODO (zach): this is listed as unused for acceptance crate +#[allow(unused)] pub(crate) fn test_read( expected: &ArrowEngineData, table: &Table, @@ -36,6 +38,8 @@ pub(crate) fn test_read( Ok(()) } +// TODO (zach): this is listed as unused for acceptance crate +#[allow(unused)] pub(crate) fn read_scan(scan: &Scan, engine: &dyn Engine) -> DeltaResult> { let scan_results = scan.execute(engine)?; scan_results diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index c779db7e3..648dd6abd 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -301,7 +301,7 @@ fn set_value( new_value: serde_json::Value, ) -> Result<(), Box> { let mut path_string = path.replace(".", "/"); - path_string.insert_str(0, "/"); + path_string.insert(0, '/'); let v = value .pointer_mut(&path_string) .ok_or_else(|| format!("key '{path}' not found"))?; @@ -638,9 +638,15 @@ async fn test_append_invalid_schema() -> Result<(), Box> }); let mut write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); - assert!(write_metadata.all(|res| matches!( - res, - Err(KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_))) - ))); + assert!(write_metadata.all(|res| match res { + Err(KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_))) => true, + Err(KernelError::Backtraced { source, .. }) + if matches!( + &*source, + KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_)) + ) => + true, + _ => false, + })); Ok(()) } From 59166c269cc8d2d82447e4b63605d6f056e1ac01 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 7 Nov 2024 14:38:30 -0800 Subject: [PATCH 22/23] address feedback --- kernel/tests/write.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 648dd6abd..0fc2a209f 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -275,7 +275,6 @@ async fn test_invalid_commit_info() -> Result<(), Box> { fn check_action_timestamps<'a>( parsed_commits: impl Iterator, ) -> Result<(), Box> { - // check that timestamps are within 10s of SystemTime::now() let now: i64 = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_millis() @@ -284,10 +283,10 @@ fn check_action_timestamps<'a>( parsed_commits.for_each(|commit| { if let Some(commit_info_ts) = &commit.pointer("/commitInfo/timestamp") { - assert!(now - commit_info_ts.as_i64().unwrap() < 10_000); + assert!((now - commit_info_ts.as_i64().unwrap()).abs() < 10_000); } if let Some(add_ts) = &commit.pointer("/add/modificationTime") { - assert!(now - add_ts.as_i64().unwrap() < 10_000); + assert!((now - add_ts.as_i64().unwrap()).abs() < 10_000); } }); From cab14d6057dbf252c80351c5e58b8c320fb7e936 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 7 Nov 2024 16:19:25 -0800 Subject: [PATCH 23/23] address feedback, add test for disallowing non-trailing slash in parquet write --- kernel/src/engine/default/parquet.rs | 33 ++++++++++++++++++++++++---- kernel/src/transaction.rs | 22 ++++++++----------- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index db41f3698..d4235fa07 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -128,8 +128,14 @@ impl DefaultParquetHandler { writer.close()?; // writer must be closed to write footer let size = buffer.len(); - let name: String = Uuid::new_v4().to_string() + ".parquet"; - // FIXME test with trailing '/' and omitting? + let name: String = format!("{}.parquet", Uuid::new_v4()); + // fail if path does not end with a trailing slash + if !path.path().ends_with('/') { + return Err(Error::generic(format!( + "Path must end with a trailing slash: {}", + path + ))); + } let path = path.join(&name)?; self.store @@ -162,8 +168,7 @@ impl DefaultParquetHandler { data_change: bool, ) -> DeltaResult> { let parquet_metadata = self.write_parquet(path, data).await?; - let write_metadata = parquet_metadata.as_record_batch(&partition_values, data_change)?; - Ok(write_metadata) + parquet_metadata.as_record_batch(&partition_values, data_change) } } @@ -540,4 +545,24 @@ mod tests { assert_eq!(data.len(), 1); assert_eq!(data[0].num_rows(), 3); } + + #[tokio::test] + async fn test_disallow_non_trailing_slash() { + let store = Arc::new(InMemory::new()); + let parquet_handler = + DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + let data = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![( + "a", + Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc, + )]) + .unwrap(), + )); + + assert!(parquet_handler + .write_parquet(&Url::parse("memory:///data").unwrap(), data) + .await + .is_err()); + } } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index e3e39d462..db6ba0e44 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -20,14 +20,13 @@ const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); const UNKNOWN_OPERATION: &str = "UNKNOWN"; pub(crate) static WRITE_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { - StructType::new(vec![ + Arc::new(StructType::new(vec![ ::get_struct_field("path"), >::get_nullable_container_struct_field("partitionValues"), ::get_struct_field("size"), ::get_struct_field("modificationTime"), ::get_struct_field("dataChange"), - ]) - .into() + ])) }); /// Get the expected schema for [`write_metadata`]. @@ -144,19 +143,16 @@ impl Transaction { fn generate_logical_to_physical(&self) -> Expression { // for now, we just pass through all the columns except partition columns. // note this is _incorrect_ if table config deems we need partition columns. - Expression::struct_from(self.read_snapshot.schema().fields().filter_map(|f| { - if self - .read_snapshot - .metadata() - .partition_columns - .contains(f.name()) - { + let partition_columns = self.read_snapshot.metadata().partition_columns.clone(); + let fields = self.read_snapshot.schema().fields(); + let fields = fields.filter_map(|f| { + if partition_columns.contains(f.name()) { None } else { - let col_name = ColumnName::new([f.name()]); - Some(col_name.into()) + Some(ColumnName::new([f.name()]).into()) } - })) + }); + Expression::struct_from(fields) } /// Get the write context for this transaction. At the moment, this is constant for the whole