diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 8205a2ccf..235df2646 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -90,6 +90,8 @@ default-engine = [ "parquet/object_store", "reqwest", "tokio", + "uuid/v4", + "uuid/fast-rng", ] developer-visibility = [] diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index d8daba774..dea2cc9fd 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -665,11 +665,11 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR /// serialize an arrow RecordBatch to a JSON string by appending to a buffer. // TODO (zach): this should stream data to the JSON writer and output an iterator. pub(crate) fn to_json_bytes( - data: impl Iterator> + Send, + data: impl Iterator>> + Send, ) -> DeltaResult> { let mut writer = LineDelimitedWriter::new(Vec::new()); for chunk in data.into_iter() { - let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?; + let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?; let record_batch = arrow_data.record_batch(); writer.write(record_batch)?; } @@ -1436,7 +1436,7 @@ mod tests { vec![Arc::new(StringArray::from(vec!["string1", "string2"]))], )?; let data: Box = Box::new(ArrowEngineData::new(data)); - let json = to_json_bytes(Box::new(std::iter::once(data)))?; + let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?; assert_eq!( json, "{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes() diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index b03b26bc6..1d8aa3058 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -96,7 +96,7 @@ impl JsonHandler for DefaultJsonHandler { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box>> + Send + '_>, _overwrite: bool, ) -> DeltaResult<()> { let buffer = to_json_bytes(data)?; diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 9fa1bdb0c..d89cf29cd 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -6,6 +6,7 @@ //! a separate thread pool, provided by the [`TaskExecutor`] trait. Read more in //! the [executor] module. +use std::collections::HashMap; use std::sync::Arc; use self::storage::parse_url_opts; @@ -16,9 +17,13 @@ use self::executor::TaskExecutor; use self::filesystem::ObjectStoreFileSystemClient; use self::json::DefaultJsonHandler; use self::parquet::DefaultParquetHandler; +use super::arrow_data::ArrowEngineData; use super::arrow_expression::ArrowExpressionHandler; +use crate::schema::Schema; +use crate::transaction::WriteContext; use crate::{ - DeltaResult, Engine, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler, + DeltaResult, Engine, EngineData, ExpressionHandler, FileSystemClient, JsonHandler, + ParquetHandler, }; pub mod executor; @@ -108,6 +113,32 @@ impl DefaultEngine { pub fn get_object_store_for_url(&self, _url: &Url) -> Option> { Some(self.store.clone()) } + + pub async fn write_parquet( + &self, + data: &ArrowEngineData, + write_context: &WriteContext, + partition_values: HashMap, + data_change: bool, + ) -> DeltaResult> { + let transform = write_context.logical_to_physical(); + let input_schema: Schema = data.record_batch().schema().try_into()?; + let output_schema = write_context.schema(); + let logical_to_physical_expr = self.get_expression_handler().get_evaluator( + input_schema.into(), + transform.clone(), + output_schema.clone().into(), + ); + let physical_data = logical_to_physical_expr.evaluate(data)?; + self.parquet + .write_parquet_file( + write_context.target_dir(), + physical_data, + partition_values, + data_change, + ) + .await + } } impl Engine for DefaultEngine { diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index ffcf4e2e9..d4235fa07 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -1,23 +1,30 @@ //! Default Parquet handler implementation +use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; +use arrow_array::builder::{MapBuilder, MapFieldNames, StringBuilder}; +use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray}; use futures::StreamExt; use object_store::path::Path; use object_store::DynObjectStore; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, }; +use parquet::arrow::arrow_writer::ArrowWriter; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; +use uuid::Uuid; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; +use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; use crate::{ - DeltaResult, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler, + DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, + ParquetHandler, }; #[derive(Debug)] @@ -27,6 +34,64 @@ pub struct DefaultParquetHandler { readahead: usize, } +/// Metadata of a data file (typically a parquet file), currently just includes the file metadata +/// but will expand to include file statistics and other metadata in the future. +#[derive(Debug)] +pub struct DataFileMetadata { + file_meta: FileMeta, +} + +impl DataFileMetadata { + pub fn new(file_meta: FileMeta) -> Self { + Self { file_meta } + } + + // convert DataFileMetadata into a record batch which matches the 'write_metadata' schema + fn as_record_batch( + &self, + partition_values: &HashMap, + data_change: bool, + ) -> DeltaResult> { + let DataFileMetadata { + file_meta: + FileMeta { + location, + last_modified, + size, + }, + } = self; + let write_metadata_schema = crate::transaction::get_write_metadata_schema(); + + // create the record batch of the write metadata + let path = Arc::new(StringArray::from(vec![location.to_string()])); + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let names = MapFieldNames { + entry: "key_value".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut builder = MapBuilder::new(Some(names), key_builder, val_builder); + for (k, v) in partition_values { + builder.keys().append_value(k); + builder.values().append_value(v); + } + builder.append(true).unwrap(); + let partitions = Arc::new(builder.finish()); + // this means max size we can write is i64::MAX (~8EB) + let size: i64 = (*size) + .try_into() + .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; + let size = Arc::new(Int64Array::from(vec![size])); + let data_change = Arc::new(BooleanArray::from(vec![data_change])); + let modification_time = Arc::new(Int64Array::from(vec![*last_modified])); + Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( + Arc::new(write_metadata_schema.as_ref().try_into()?), + vec![path, partitions, size, modification_time, data_change], + )?))) + } +} + impl DefaultParquetHandler { pub fn new(store: Arc, task_executor: Arc) -> Self { Self { @@ -43,6 +108,68 @@ impl DefaultParquetHandler { self.readahead = readahead; self } + + // Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet + // metadata (where is a generated UUIDv4). + // + // Note: after encoding the data as parquet, this issues a PUT followed by a HEAD to storage in + // order to obtain metadata about the object just written. + async fn write_parquet( + &self, + path: &url::Url, + data: Box, + ) -> DeltaResult { + let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?; + let record_batch = batch.record_batch(); + + let mut buffer = vec![]; + let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?; + writer.write(record_batch)?; + writer.close()?; // writer must be closed to write footer + + let size = buffer.len(); + let name: String = format!("{}.parquet", Uuid::new_v4()); + // fail if path does not end with a trailing slash + if !path.path().ends_with('/') { + return Err(Error::generic(format!( + "Path must end with a trailing slash: {}", + path + ))); + } + let path = path.join(&name)?; + + self.store + .put(&Path::from(path.path()), buffer.into()) + .await?; + + let metadata = self.store.head(&Path::from(path.path())).await?; + let modification_time = metadata.last_modified.timestamp_millis(); + if size != metadata.size { + return Err(Error::generic(format!( + "Size mismatch after writing parquet file: expected {}, got {}", + size, metadata.size + ))); + } + + let file_meta = FileMeta::new(path, modification_time, size); + Ok(DataFileMetadata::new(file_meta)) + } + + /// Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet + /// metadata as an EngineData batch which matches the [write metadata] schema (where is + /// a generated UUIDv4). + /// + /// [write metadata]: crate::transaction::get_write_metadata_schema + pub async fn write_parquet_file( + &self, + path: &url::Url, + data: Box, + partition_values: HashMap, + data_change: bool, + ) -> DeltaResult> { + let parquet_metadata = self.write_parquet(path, data).await?; + parquet_metadata.as_record_batch(&partition_values, data_change) + } } impl ParquetHandler for DefaultParquetHandler { @@ -242,9 +369,12 @@ impl FileOpener for PresignedUrlOpener { #[cfg(test)] mod tests { use std::path::PathBuf; + use std::time::{SystemTime, UNIX_EPOCH}; + use arrow_array::array::Array; use arrow_array::RecordBatch; - use object_store::{local::LocalFileSystem, ObjectStore}; + use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; + use url::Url; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; @@ -297,4 +427,142 @@ mod tests { assert_eq!(data.len(), 1); assert_eq!(data[0].num_rows(), 10); } + + #[test] + fn test_as_record_batch() { + let location = Url::parse("file:///test_url").unwrap(); + let size = 1_000_000; + let last_modified = 10000000000; + let file_metadata = FileMeta::new(location.clone(), last_modified, size as usize); + let data_file_metadata = DataFileMetadata::new(file_metadata); + let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]); + let data_change = true; + let actual = data_file_metadata + .as_record_batch(&partition_values, data_change) + .unwrap(); + let actual = ArrowEngineData::try_from_engine_data(actual).unwrap(); + + let schema = Arc::new( + crate::transaction::get_write_metadata_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let mut partition_values_builder = MapBuilder::new( + Some(MapFieldNames { + entry: "key_value".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + key_builder, + val_builder, + ); + partition_values_builder.keys().append_value("partition1"); + partition_values_builder.values().append_value("a"); + partition_values_builder.append(true).unwrap(); + let partition_values = partition_values_builder.finish(); + let expected = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec![location.to_string()])), + Arc::new(partition_values), + Arc::new(Int64Array::from(vec![size])), + Arc::new(Int64Array::from(vec![last_modified])), + Arc::new(BooleanArray::from(vec![data_change])), + ], + ) + .unwrap(); + + assert_eq!(actual.record_batch(), &expected); + } + + #[tokio::test] + async fn test_write_parquet() { + let store = Arc::new(InMemory::new()); + let parquet_handler = + DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + let data = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![( + "a", + Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc, + )]) + .unwrap(), + )); + + let write_metadata = parquet_handler + .write_parquet(&Url::parse("memory:///data/").unwrap(), data) + .await + .unwrap(); + + let DataFileMetadata { + file_meta: + ref parquet_file @ FileMeta { + ref location, + last_modified, + size, + }, + } = write_metadata; + let expected_location = Url::parse("memory:///data/").unwrap(); + let expected_size = 497; + + // check that last_modified is within 10s of now + let now: i64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap(); + + let filename = location.path().split('/').last().unwrap(); + assert_eq!(&expected_location.join(filename).unwrap(), location); + assert_eq!(expected_size, size); + assert!(now - last_modified < 10_000); + + // check we can read back + let path = Path::from(location.path()); + let meta = store.head(&path).await.unwrap(); + let reader = ParquetObjectReader::new(store.clone(), meta.clone()); + let physical_schema = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .schema() + .clone(); + + let data: Vec = parquet_handler + .read_parquet_files( + &[parquet_file.clone()], + Arc::new(physical_schema.try_into().unwrap()), + None, + ) + .unwrap() + .map(into_record_batch) + .try_collect() + .unwrap(); + + assert_eq!(data.len(), 1); + assert_eq!(data[0].num_rows(), 3); + } + + #[tokio::test] + async fn test_disallow_non_trailing_slash() { + let store = Arc::new(InMemory::new()); + let parquet_handler = + DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + let data = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![( + "a", + Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc, + )]) + .unwrap(), + )); + + assert!(parquet_handler + .write_parquet(&Url::parse("memory:///data").unwrap(), data) + .await + .is_err()); + } } diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index 016fb2658..3d33b1025 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box>> + Send + '_>, _overwrite: bool, ) -> DeltaResult<()> { let path = path @@ -120,10 +120,10 @@ mod tests { let url = Url::from_file_path(path.clone()).unwrap(); handler - .write_json_file(&url, Box::new(std::iter::once(data)), false) + .write_json_file(&url, Box::new(std::iter::once(Ok(data))), false) .expect("write json file"); assert!(matches!( - handler.write_json_file(&url, Box::new(std::iter::once(empty)), false), + handler.write_json_file(&url, Box::new(std::iter::once(Ok(empty))), false), Err(Error::FileAlreadyExists(_)) )); diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 2f686a3ad..40fa360f5 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -107,7 +107,7 @@ pub type FileDataReadResultIterator = pub struct FileMeta { /// The fully qualified path to the object pub location: Url, - /// The last modified time + /// The last modified time as milliseconds since unix epoch pub last_modified: i64, /// The size in bytes of the object pub size: usize, @@ -125,6 +125,17 @@ impl PartialOrd for FileMeta { } } +impl FileMeta { + /// Create a new instance of `FileMeta` + pub fn new(location: Url, last_modified: i64, size: usize) -> Self { + Self { + location, + last_modified, + size, + } + } +} + /// Trait for implementing an Expression evaluator. /// /// It contains one Expression which can be evaluated on multiple ColumnarBatches. @@ -234,7 +245,7 @@ pub trait JsonHandler: Send + Sync { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box>> + Send + '_>, overwrite: bool, ) -> DeltaResult<()>; } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 81b0f31f8..db6ba0e44 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -1,19 +1,41 @@ +use std::collections::HashMap; use std::iter; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use std::time::{SystemTime, UNIX_EPOCH}; -use crate::actions::get_log_commit_info_schema; +use crate::actions::schemas::{GetNullableContainerStructField, GetStructField}; use crate::actions::COMMIT_INFO_NAME; +use crate::actions::{get_log_add_schema, get_log_commit_info_schema}; use crate::error::Error; -use crate::expressions::{column_expr, Scalar, StructData}; +use crate::expressions::{column_expr, ColumnName, Scalar, StructData}; use crate::path::ParsedLogPath; -use crate::schema::{StructField, StructType}; +use crate::schema::{SchemaRef, StructField, StructType}; use crate::snapshot::Snapshot; use crate::{DataType, DeltaResult, Engine, EngineData, Expression, Version}; +use itertools::chain; +use url::Url; + const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); const UNKNOWN_OPERATION: &str = "UNKNOWN"; +pub(crate) static WRITE_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new(vec![ + ::get_struct_field("path"), + >::get_nullable_container_struct_field("partitionValues"), + ::get_struct_field("size"), + ::get_struct_field("modificationTime"), + ::get_struct_field("dataChange"), + ])) +}); + +/// Get the expected schema for [`write_metadata`]. +/// +/// [`write_metadata`]: crate::transaction::Transaction::write_metadata +pub fn get_write_metadata_schema() -> &'static SchemaRef { + &WRITE_METADATA_SCHEMA +} + /// A transaction represents an in-progress write to a table. After creating a transaction, changes /// to the table may be staged via the transaction methods before calling `commit` to commit the /// changes to the table. @@ -32,6 +54,7 @@ pub struct Transaction { read_snapshot: Arc, operation: Option, commit_info: Option>, + write_metadata: Vec>, } impl std::fmt::Debug for Transaction { @@ -56,6 +79,7 @@ impl Transaction { read_snapshot: snapshot.into(), operation: None, commit_info: None, + write_metadata: vec![], } } @@ -63,16 +87,17 @@ impl Transaction { /// will include the failed transaction in case of a conflict so the user can retry. pub fn commit(self, engine: &dyn Engine) -> DeltaResult { // step one: construct the iterator of actions we want to commit - // note: only support commit_info right now (and it's required) let engine_commit_info = self .commit_info .as_ref() .ok_or_else(|| Error::MissingCommitInfo)?; - let actions = Box::new(iter::once(generate_commit_info( + let commit_info = generate_commit_info( engine, self.operation.as_deref(), engine_commit_info.as_ref(), - )?)); + ); + let adds = generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref())); + let actions = chain(iter::once(commit_info), adds); // step two: set new commit version (current_version + 1) and path to write let commit_version = self.read_snapshot.version() + 1; @@ -112,6 +137,104 @@ impl Transaction { self.commit_info = Some(commit_info.into()); self } + + // Generate the logical-to-physical transform expression which must be evaluated on every data + // chunk before writing. At the moment, this is a transaction-wide expression. + fn generate_logical_to_physical(&self) -> Expression { + // for now, we just pass through all the columns except partition columns. + // note this is _incorrect_ if table config deems we need partition columns. + let partition_columns = self.read_snapshot.metadata().partition_columns.clone(); + let fields = self.read_snapshot.schema().fields(); + let fields = fields.filter_map(|f| { + if partition_columns.contains(f.name()) { + None + } else { + Some(ColumnName::new([f.name()]).into()) + } + }); + Expression::struct_from(fields) + } + + /// Get the write context for this transaction. At the moment, this is constant for the whole + /// transaction. + // Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure + // that engines cannot call this method after a metadata change, since the write context could + // have invalid metadata. + pub fn get_write_context(&self) -> WriteContext { + let target_dir = self.read_snapshot.table_root(); + let snapshot_schema = self.read_snapshot.schema(); + let logical_to_physical = self.generate_logical_to_physical(); + WriteContext::new( + target_dir.clone(), + Arc::new(snapshot_schema.clone()), + logical_to_physical, + ) + } + + /// Add write metadata about files to include in the transaction. This API can be called + /// multiple times to add multiple batches. + /// + /// The expected schema for `write_metadata` is given by [`get_write_metadata_schema`]. + pub fn add_write_metadata(&mut self, write_metadata: Box) { + self.write_metadata.push(write_metadata); + } +} + +// convert write_metadata into add actions using an expression to transform the data in a single +// pass +fn generate_adds<'a>( + engine: &dyn Engine, + write_metadata: impl Iterator + Send + 'a, +) -> impl Iterator>> + Send + 'a { + let expression_handler = engine.get_expression_handler(); + let write_metadata_schema = get_write_metadata_schema(); + let log_schema = get_log_add_schema(); + + write_metadata.map(move |write_metadata_batch| { + let adds_expr = Expression::struct_from([Expression::struct_from( + write_metadata_schema + .fields() + .map(|f| ColumnName::new([f.name()]).into()), + )]); + let adds_evaluator = expression_handler.get_evaluator( + write_metadata_schema.clone(), + adds_expr, + log_schema.clone().into(), + ); + adds_evaluator.evaluate(write_metadata_batch) + }) +} + +/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to +/// write table data. +/// +/// [`Transaction`]: struct.Transaction.html +pub struct WriteContext { + target_dir: Url, + schema: SchemaRef, + logical_to_physical: Expression, +} + +impl WriteContext { + fn new(target_dir: Url, schema: SchemaRef, logical_to_physical: Expression) -> Self { + WriteContext { + target_dir, + schema, + logical_to_physical, + } + } + + pub fn target_dir(&self) -> &Url { + &self.target_dir + } + + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + pub fn logical_to_physical(&self) -> &Expression { + &self.logical_to_physical + } } /// Result after committing a transaction. If 'committed', the version is the new version written @@ -208,6 +331,7 @@ mod tests { use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_expression::ArrowExpressionHandler; + use crate::schema::MapType; use crate::{ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler}; use arrow::json::writer::LineDelimitedWriter; @@ -541,4 +665,21 @@ mod tests { } Ok(()) } + + #[test] + fn test_write_metadata_schema() { + let schema = get_write_metadata_schema(); + let expected = StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new( + "partitionValues", + MapType::new(DataType::STRING, DataType::STRING, true), + false, + ), + StructField::new("size", DataType::LONG, false), + StructField::new("modificationTime", DataType::LONG, false), + StructField::new("dataChange", DataType::BOOLEAN, false), + ]); + assert_eq!(*schema, expected.into()); + } } diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index c43a7df5e..c219efd61 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -1,7 +1,11 @@ -use crate::ArrowEngineData; +use arrow::compute::filter_record_batch; use arrow::record_batch::RecordBatch; -use delta_kernel::DeltaResult; -use delta_kernel::EngineData; +use arrow::util::pretty::pretty_format_batches; +use itertools::Itertools; + +use crate::ArrowEngineData; +use delta_kernel::scan::Scan; +use delta_kernel::{DeltaResult, Engine, EngineData, Table}; pub(crate) fn to_arrow(data: Box) -> DeltaResult { Ok(data @@ -10,3 +14,45 @@ pub(crate) fn to_arrow(data: Box) -> DeltaResult { .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? .into()) } + +// TODO (zach): this is listed as unused for acceptance crate +#[allow(unused)] +pub(crate) fn test_read( + expected: &ArrowEngineData, + table: &Table, + engine: &impl Engine, +) -> Result<(), Box> { + let snapshot = table.snapshot(engine, None)?; + let scan = snapshot.into_scan_builder().build()?; + let batches = read_scan(&scan, engine)?; + let formatted = pretty_format_batches(&batches).unwrap().to_string(); + + let expected = pretty_format_batches(&[expected.record_batch().clone()]) + .unwrap() + .to_string(); + + println!("actual:\n{formatted}"); + println!("expected:\n{expected}"); + assert_eq!(formatted, expected); + + Ok(()) +} + +// TODO (zach): this is listed as unused for acceptance crate +#[allow(unused)] +pub(crate) fn read_scan(scan: &Scan, engine: &dyn Engine) -> DeltaResult> { + let scan_results = scan.execute(engine)?; + scan_results + .map(|scan_result| -> DeltaResult<_> { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch = to_arrow(data)?; + if let Some(mask) = mask { + Ok(filter_record_batch(&record_batch, &mask.into())?) + } else { + Ok(record_batch) + } + }) + .try_collect() +} diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 4d2c977e0..259b7c457 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -17,15 +17,14 @@ use delta_kernel::expressions::{column_expr, BinaryOperator, Expression}; use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats}; use delta_kernel::scan::{transform_to_logical, Scan}; use delta_kernel::schema::Schema; -use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table}; -use itertools::Itertools; +use delta_kernel::{Engine, EngineData, FileMeta, Table}; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::arrow_writer::ArrowWriter; use parquet::file::properties::WriterProperties; use url::Url; mod common; -use common::to_arrow; +use common::{read_scan, to_arrow}; const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet"; const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet"; @@ -393,20 +392,7 @@ fn read_with_execute( expected: &[String], ) -> Result<(), Box> { let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?); - let scan_results = scan.execute(engine)?; - let batches: Vec = scan_results - .map(|scan_result| -> DeltaResult<_> { - let scan_result = scan_result?; - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let record_batch = to_arrow(data)?; - if let Some(mask) = mask { - Ok(filter_record_batch(&record_batch, &mask.into())?) - } else { - Ok(record_batch) - } - }) - .try_collect()?; + let batches = read_scan(scan, engine)?; if expected.is_empty() { assert_eq!(batches.len(), 0); diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 212b06cae..0fc2a209f 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1,12 +1,16 @@ +use std::collections::HashMap; use std::sync::Arc; -use arrow::array::StringArray; +use arrow::array::{Int32Array, StringArray}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema as ArrowSchema; use arrow_schema::{DataType as ArrowDataType, Field}; +use itertools::Itertools; +use object_store::local::LocalFileSystem; use object_store::memory::InMemory; use object_store::path::Path; use object_store::ObjectStore; +use serde_json::Deserializer; use serde_json::{json, to_vec}; use url::Url; @@ -14,28 +18,37 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; -use delta_kernel::{Error as KernelError, Table}; +use delta_kernel::Error as KernelError; +use delta_kernel::{DeltaResult, Table}; -// setup default engine with in-memory object store. +mod common; +use common::test_read; + +// setup default engine with in-memory (=true) or local fs (=false) object store. fn setup( table_name: &str, + in_memory: bool, ) -> ( Arc, DefaultEngine, Url, ) { - let table_root_path = Path::from(format!("/{table_name}")); - let url = Url::parse(&format!("memory:///{}/", table_root_path)).unwrap(); - let storage = Arc::new(InMemory::new()); - ( - storage.clone(), - DefaultEngine::new( - storage, - table_root_path, - Arc::new(TokioBackgroundExecutor::new()), - ), - url, - ) + let (storage, base_path, base_url): (Arc, &str, &str) = if in_memory { + (Arc::new(InMemory::new()), "/", "memory:///") + } else { + ( + Arc::new(LocalFileSystem::new()), + "./kernel_write_tests/", + "file://", + ) + }; + + let table_root_path = Path::from(format!("{base_path}{table_name}")); + let url = Url::parse(&format!("{base_url}{table_root_path}/")).unwrap(); + let executor = Arc::new(TokioBackgroundExecutor::new()); + let engine = DefaultEngine::new(Arc::clone(&storage), table_root_path, executor); + + (storage, engine, url) } // we provide this table creation function since we only do appends to existing tables for now. @@ -85,21 +98,8 @@ async fn create_table( Ok(Table::new(table_path)) } -#[tokio::test] -async fn test_commit_info() -> Result<(), Box> { - // setup tracing - let _ = tracing_subscriber::fmt::try_init(); - // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); - - // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( - "number", - DataType::INTEGER, - true, - )])); - let table = create_table(store.clone(), table_location, schema, &[]).await?; - +// create commit info in arrow of the form {engineInfo: "default engine"} +fn new_commit_info() -> DeltaResult> { // create commit info of the form {engineCommitInfo: Map { "engineInfo": "default engine" } } let commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( "engineCommitInfo", @@ -136,11 +136,30 @@ async fn test_commit_info() -> Result<(), Box> { let commit_info_batch = RecordBatch::try_new(commit_info_schema.clone(), vec![Arc::new(array)])?; + Ok(Box::new(ArrowEngineData::new(commit_info_batch))) +} + +#[tokio::test] +async fn test_commit_info() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema, &[]).await?; + + let commit_info = new_commit_info()?; // create a transaction let txn = table .new_transaction(&engine)? - .with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch))); + .with_commit_info(commit_info); // commit! txn.commit(&engine)?; @@ -179,7 +198,7 @@ async fn test_empty_commit() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' let schema = Arc::new(StructType::new(vec![StructField::new( @@ -202,7 +221,7 @@ async fn test_invalid_commit_info() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' let schema = Arc::new(StructType::new(vec![StructField::new( @@ -251,3 +270,382 @@ async fn test_invalid_commit_info() -> Result<(), Box> { )); Ok(()) } + +// check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() +fn check_action_timestamps<'a>( + parsed_commits: impl Iterator, +) -> Result<(), Box> { + let now: i64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_millis() + .try_into() + .unwrap(); + + parsed_commits.for_each(|commit| { + if let Some(commit_info_ts) = &commit.pointer("/commitInfo/timestamp") { + assert!((now - commit_info_ts.as_i64().unwrap()).abs() < 10_000); + } + if let Some(add_ts) = &commit.pointer("/add/modificationTime") { + assert!((now - add_ts.as_i64().unwrap()).abs() < 10_000); + } + }); + + Ok(()) +} + +// update `value` at (.-separated) `path` to `new_value` +fn set_value( + value: &mut serde_json::Value, + path: &str, + new_value: serde_json::Value, +) -> Result<(), Box> { + let mut path_string = path.replace(".", "/"); + path_string.insert(0, '/'); + let v = value + .pointer_mut(&path_string) + .ok_or_else(|| format!("key '{path}' not found"))?; + *v = new_value; + Ok(()) +} + +#[tokio::test] +async fn test_append() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema.clone(), &[]).await?; + + let commit_info = new_commit_info()?; + + let mut txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.get_write_context()); + let tasks = append_data.into_iter().map(|data| { + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::new(), + true, + ) + .await + }) + }); + + let write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); + for meta in write_metadata { + txn.add_write_metadata(meta?); + } + + // commit! + txn.commit(engine.as_ref())?; + + let commit1 = store + .get(&Path::from( + "/test_table/_delta_log/00000000000000000001.json", + )) + .await?; + + let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?) + .into_iter::() + .try_collect()?; + + // check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() + // before we clear them for comparison + check_action_timestamps(parsed_commits.iter())?; + + // set timestamps to 0 and paths to known string values for comparison + // (otherwise timestamps are non-deterministic and paths are random UUIDs) + set_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?; + set_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?; + set_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[2], "add.path", json!("second.parquet"))?; + + let expected_commit = vec![ + json!({ + "commitInfo": { + "timestamp": 0, + "operation": "UNKNOWN", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }), + json!({ + "add": { + "path": "first.parquet", + "partitionValues": {}, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + json!({ + "add": { + "path": "second.parquet", + "partitionValues": {}, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + ]; + + assert_eq!(parsed_commits, expected_commit); + + test_read( + &ArrowEngineData::new(RecordBatch::try_new( + Arc::new(schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(vec![ + 1, 2, 3, 4, 5, 6, + ]))], + )?), + &table, + engine.as_ref(), + )?; + Ok(()) +} + +#[tokio::test] +async fn test_append_partitioned() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + let partition_col = "partition"; + + // create a simple partitioned table: one int column named 'number', partitioned by string + // column named 'partition' + let table_schema = Arc::new(StructType::new(vec![ + StructField::new("number", DataType::INTEGER, true), + StructField::new("partition", DataType::STRING, true), + ])); + let data_schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table( + store.clone(), + table_location, + table_schema.clone(), + &[partition_col], + ) + .await?; + + let commit_info = new_commit_info()?; + + let mut txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(data_schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + let partition_vals = vec!["a", "b"]; + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.get_write_context()); + let tasks = append_data + .into_iter() + .zip(partition_vals) + .map(|(data, partition_val)| { + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::from([(partition_col.to_string(), partition_val.to_string())]), + true, + ) + .await + }) + }); + + let write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); + for meta in write_metadata { + txn.add_write_metadata(meta?); + } + + // commit! + txn.commit(engine.as_ref())?; + + let commit1 = store + .get(&Path::from( + "/test_table/_delta_log/00000000000000000001.json", + )) + .await?; + + let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?) + .into_iter::() + .try_collect()?; + + // check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() + // before we clear them for comparison + check_action_timestamps(parsed_commits.iter())?; + + // set timestamps to 0 and paths to known string values for comparison + // (otherwise timestamps are non-deterministic and paths are random UUIDs) + set_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?; + set_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?; + set_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[2], "add.path", json!("second.parquet"))?; + + let expected_commit = vec![ + json!({ + "commitInfo": { + "timestamp": 0, + "operation": "UNKNOWN", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }), + json!({ + "add": { + "path": "first.parquet", + "partitionValues": { + "partition": "a" + }, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + json!({ + "add": { + "path": "second.parquet", + "partitionValues": { + "partition": "b" + }, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + ]; + + assert_eq!(parsed_commits, expected_commit); + + test_read( + &ArrowEngineData::new(RecordBatch::try_new( + Arc::new(table_schema.as_ref().try_into()?), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])), + Arc::new(StringArray::from(vec!["a", "a", "a", "b", "b", "b"])), + ], + )?), + &table, + engine.as_ref(), + )?; + Ok(()) +} + +#[tokio::test] +async fn test_append_invalid_schema() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + + // create a simple table: one int column named 'number' + let table_schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + // incompatible data schema: one string column named 'string' + let data_schema = Arc::new(StructType::new(vec![StructField::new( + "string", + DataType::STRING, + true, + )])); + let table = create_table(store.clone(), table_location, table_schema.clone(), &[]).await?; + + let commit_info = new_commit_info()?; + + let txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [["a", "b"], ["c", "d"]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(data_schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::StringArray::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.get_write_context()); + let tasks = append_data.into_iter().map(|data| { + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::new(), + true, + ) + .await + }) + }); + + let mut write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); + assert!(write_metadata.all(|res| match res { + Err(KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_))) => true, + Err(KernelError::Backtraced { source, .. }) + if matches!( + &*source, + KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_)) + ) => + true, + _ => false, + })); + Ok(()) +}