diff --git a/Cargo.lock b/Cargo.lock index 5afd1e1c..fee6d39c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6648,6 +6648,7 @@ dependencies = [ "deltalake", "futures", "hex", + "iceberg", "iceberg-datafusion", "indexmap 2.6.0", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 122195aa..562fb4c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "fix-deci futures = "0.3" hex = ">=0.4.0" +iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "eeca14d13b23f2a92e57f503c141a860a3407226" } iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "eeca14d13b23f2a92e57f503c141a860a3407226" } indexmap = "2.6.0" itertools = { workspace = true } diff --git a/clade/proto/schema.proto b/clade/proto/schema.proto index 5d726731..6b455720 100644 --- a/clade/proto/schema.proto +++ b/clade/proto/schema.proto @@ -13,6 +13,13 @@ message TableObject { string path = 2; // Storage location identifier optional string store = 4; + // Table format + TableFormat format = 5; +} + +enum TableFormat { + DELTA = 0; + ICEBERG = 1; } // A single root storage location, hosting many individual tables diff --git a/src/catalog/metastore.rs b/src/catalog/metastore.rs index 43b4cd01..0f0f14dd 100644 --- a/src/catalog/metastore.rs +++ b/src/catalog/metastore.rs @@ -2,7 +2,7 @@ use crate::catalog::external::ExternalStore; use crate::catalog::repository::RepositoryStore; use crate::catalog::{ CatalogError, CatalogResult, CatalogStore, CreateFunctionError, FunctionStore, - SchemaStore, TableStore, + SchemaStore, TableStore, DEFAULT_SCHEMA, }; use crate::object_store::factory::ObjectStoreFactory; @@ -13,21 +13,25 @@ use crate::wasm_udf::data_types::{ CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage, CreateFunctionVolatility, }; -use clade::schema::{SchemaObject, TableObject}; +use clade::schema::{SchemaObject, TableFormat, TableObject}; use dashmap::DashMap; use datafusion::catalog_common::memory::MemorySchemaProvider; use datafusion::datasource::TableProvider; +use super::empty::EmptyStore; use crate::catalog::memory::MemoryStore; +use crate::object_store::utils::object_store_opts_to_file_io_props; use deltalake::DeltaTable; use futures::{stream, StreamExt, TryStreamExt}; +use iceberg::io::FileIO; +use iceberg::table::StaticTable; +use iceberg::TableIdent; +use iceberg_datafusion::IcebergTableProvider; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use url::Url; -use super::empty::EmptyStore; - // Root URL for a storage location alongside client connection options type LocationAndOptions = (String, HashMap); @@ -166,35 +170,82 @@ impl Metastore { // delta tables present in the database. The real fix for this is to make DF use `TableSource` // for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs. - let table_log_store = match table.store { - // Use the provided customized location - Some(name) => { - let (location, this_store_options) = store_options - .get(&name) - .ok_or(CatalogError::Generic { - reason: format!("Object store with name {name} not found"), - })? - .clone(); - - self.object_stores - .get_log_store_for_table( - Url::parse(&location)?, - this_store_options, - table.path, - ) - .await? + match TableFormat::try_from(table.format).map_err(|e| CatalogError::Generic { + reason: format!("Unrecognized table format id {}: {e}", table.format), + })? { + TableFormat::Delta => { + let table_log_store = match table.store { + // Use the provided customized location + Some(name) => { + let (location, this_store_options) = store_options + .get(&name) + .ok_or(CatalogError::Generic { + reason: format!( + "Object store with name {name} not found" + ), + })? + .clone(); + + self.object_stores + .get_log_store_for_table( + Url::parse(&location)?, + this_store_options, + table.path, + ) + .await? + } + // Use the configured, default, object store + None => self + .object_stores + .get_default_log_store(&table.path) + .ok_or(CatalogError::NoTableStoreInInlineMetastore { + name: table.name.clone(), + })?, + }; + + let delta_table = DeltaTable::new(table_log_store, Default::default()); + Ok((Arc::from(table.name), Arc::new(delta_table) as _)) } - // Use the configured, default, object store - None => self - .object_stores - .get_default_log_store(&table.path) - .ok_or(CatalogError::NoTableStoreInInlineMetastore { - name: table.name.clone(), - })?, - }; - - let delta_table = DeltaTable::new(table_log_store, Default::default()); - Ok((Arc::from(table.name), Arc::new(delta_table) as _)) + TableFormat::Iceberg => { + let (location, file_io) = match table.store { + Some(name) => { + let (location, this_store_options) = store_options + .get(&name) + .ok_or(CatalogError::Generic { + reason: format!( + "Object store with name {name} not found" + ), + })? + .clone(); + + let file_io_props = object_store_opts_to_file_io_props(&this_store_options); + let file_io = FileIO::from_path(&location)?.with_props(file_io_props).build()?; + (location, file_io) + } + None => return Err(CatalogError::Generic { + reason: "Iceberg tables must pass FileIO props as object store options".to_string(), + }), + }; + + // Create the full path to table metadata by combining the object store location and + // relative table metadata path + let absolute_path = format!( + "{}/{}", + location.trim_end_matches("/"), + table.path.trim_start_matches("/") + ); + let iceberg_table = StaticTable::from_metadata_file( + &absolute_path, + TableIdent::from_strs(vec![DEFAULT_SCHEMA, &table.name])?, + file_io, + ) + .await? + .into_table(); + let table_provider = + IcebergTableProvider::try_new_from_table(iceberg_table).await?; + Ok((Arc::from(table.name), Arc::new(table_provider) as _)) + } + } } pub async fn build_functions( diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 0e8f0707..b1b22e1d 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -91,6 +91,9 @@ pub enum CatalogError { #[error("No inline metastore passed in")] NoInlineMetastore, + + #[error("Failed constructing an Iceberg table: {0}")] + IcebergError(#[from] iceberg::Error), } /// Implement a global converter into a DataFusionError from the catalog error type. diff --git a/src/catalog/repository.rs b/src/catalog/repository.rs index e3960048..a70cfb10 100644 --- a/src/catalog/repository.rs +++ b/src/catalog/repository.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use itertools::Itertools; use uuid::Uuid; -use clade::schema::{ListSchemaResponse, SchemaObject, TableObject}; +use clade::schema::{ListSchemaResponse, SchemaObject, TableFormat, TableObject}; use crate::catalog::{ CatalogError, CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore, @@ -133,6 +133,7 @@ impl SchemaStore for RepositoryStore { name: name.clone(), path: uuid.to_string(), store: None, + format: TableFormat::Delta.into(), }) } else { None diff --git a/src/object_store/utils.rs b/src/object_store/utils.rs index c7b932a7..ab232610 100644 --- a/src/object_store/utils.rs +++ b/src/object_store/utils.rs @@ -1,6 +1,10 @@ use futures::TryFutureExt; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use object_store::aws::AmazonS3ConfigKey; use object_store::Error; +use std::collections::HashMap; use std::path::Path as StdPath; +use std::str::FromStr; use tokio::fs::{copy, create_dir_all, remove_file, rename}; use tracing::debug; @@ -44,3 +48,33 @@ pub async fn fast_upload(from: &StdPath, to: String) -> object_store::Result<(), Ok(()) } } + +// Go through all known keys for object store and convert them to corresponding file_io ones. +// +// For now only converts S3 keys. +// TODO: At some point this should be redundant, since there is an OpenDAL adapter for object_store, +// https://github.com/apache/iceberg-rust/issues/172 +pub fn object_store_opts_to_file_io_props( + opts: &HashMap, +) -> HashMap { + let mut props = HashMap::new(); + + for (key, val) in opts.iter() { + let key = match AmazonS3ConfigKey::from_str(key) { + Ok(AmazonS3ConfigKey::AccessKeyId) => S3_ACCESS_KEY_ID, + Ok(AmazonS3ConfigKey::SecretAccessKey) => S3_SECRET_ACCESS_KEY, + Ok(AmazonS3ConfigKey::Region) => S3_REGION, + Ok(AmazonS3ConfigKey::Endpoint) => S3_ENDPOINT, + _ => key, // for now just propagate any non-matched keys + }; + + props.insert(key.to_string(), val.clone()); + } + + // FileIO requires the region prop even when the S3 store doesn't (e.g. MinIO) + props + .entry(S3_REGION.to_string()) + .or_insert("dummy-region".to_string()); + + props +} diff --git a/src/provider.rs b/src/provider.rs index ea26d43f..c75d512d 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -85,7 +85,7 @@ impl SchemaProvider for SeafowlSchema { let mut delta_table = match self.tables.get(name) { None => return Ok(None), Some(table) => match table.as_any().downcast_ref::() { - // This shouldn't happen since we store only DeltaTable's in the map + // Different table format, e.g. Iceberg None => return Ok(Some(table.clone())), Some(delta_table) => { if delta_table.version() != -1 { diff --git a/tests/clade/query.rs b/tests/clade/query.rs index 017b4699..6ab7e2d4 100644 --- a/tests/clade/query.rs +++ b/tests/clade/query.rs @@ -18,23 +18,20 @@ async fn test_basic_select(#[case] table: &str, #[case] object_store: bool) -> ( let _r = context.metastore.schemas.list(DEFAULT_DB).await; let plan = context - .plan_query(&format!("SELECT * FROM {table} ORDER BY value")) + .plan_query(&format!("SELECT * FROM {table} ORDER BY key")) .await .unwrap(); let results = context.collect(plan).await.unwrap(); let expected = [ - "+-------+------+-------+-----+", - "| value | year | month | day |", - "+-------+------+-------+-----+", - "| 1 | 2020 | 1 | 1 |", - "| 2 | 2020 | 2 | 3 |", - "| 3 | 2020 | 2 | 5 |", - "| 4 | 2021 | 4 | 5 |", - "| 5 | 2021 | 12 | 4 |", - "| 6 | 2021 | 12 | 20 |", - "| 7 | 2021 | 12 | 20 |", - "+-------+------+-------+-----+", + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "+-----+-------+", ]; assert_batches_eq!(expected, &results); } diff --git a/tests/data/delta-0.8.0-partitioned/_delta_log/00000000000000000000.json b/tests/data/delta-0.8.0-partitioned/_delta_log/00000000000000000000.json deleted file mode 100644 index 2fa0736f..00000000 --- a/tests/data/delta-0.8.0-partitioned/_delta_log/00000000000000000000.json +++ /dev/null @@ -1,9 +0,0 @@ -{"commitInfo":{"timestamp":1615555646188,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"year\",\"month\",\"day\"]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputBytes":"2477","numOutputRows":"7"}}} -{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} -{"metaData":{"id":"fe5a3c11-30d4-4dd7-b115-a1c121e66a4e","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"year\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"day\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["year","month","day"],"configuration":{},"createdTime":1615555644515}} -{"add":{"path":"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet","partitionValues":{"year":"2020","month":"1","day":"1"},"size":414,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet","partitionValues":{"year":"2020","month":"2","day":"3"},"size":414,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet","partitionValues":{"year":"2020","month":"2","day":"5"},"size":414,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet","partitionValues":{"year":"2021","month":"12","day":"20"},"size":407,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet","partitionValues":{"year":"2021","month":"12","day":"4"},"size":414,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet","partitionValues":{"year":"2021","month":"4","day":"5"},"size":414,"modificationTime":1615555646000,"dataChange":true}} diff --git a/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/.part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/.part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet.crc deleted file mode 100644 index 65441ce9..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/.part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet.crc and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet b/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet deleted file mode 100644 index 0aea93eb..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/.part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/.part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet.crc deleted file mode 100644 index e5631399..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/.part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet.crc and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet b/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet deleted file mode 100644 index 43bee4ed..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/.part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/.part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet.crc deleted file mode 100644 index 531830fd..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/.part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet.crc and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet b/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet deleted file mode 100644 index b05044de..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=20/.part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=20/.part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet.crc deleted file mode 100644 index aba8a099..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=20/.part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet.crc and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet b/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet deleted file mode 100644 index 471e2d72..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/.part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/.part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet.crc deleted file mode 100644 index 6e673394..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/.part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet.crc and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet b/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet deleted file mode 100644 index 24889e45..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/.part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/.part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet.crc deleted file mode 100644 index 8788a17c..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/.part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet.crc and /dev/null differ diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet b/tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet deleted file mode 100644 index eee452ca..00000000 Binary files a/tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet and /dev/null differ diff --git a/tests/data/delta/_delta_log/00000000000000000000.json b/tests/data/delta/_delta_log/00000000000000000000.json new file mode 100644 index 00000000..451b4a7f --- /dev/null +++ b/tests/data/delta/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"b4d25be5-de67-4fca-8f6a-9aa18eeb141e","name":"test","description":"Created by Seafowl 0.5.8","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"key\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1732876066617,"configuration":{}}} +{"commitInfo":{"timestamp":1732876066618,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","mode":"ErrorIfExists","location":"file:///Users/gruuya/Splitgraph/seafowl-data/2f6abe4f-c07e-43fa-a98d-6200a34380dd","metadata":"{\"configuration\":{},\"createdTime\":1732876066617,\"description\":\"Created by Seafowl 0.5.8\",\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"b4d25be5-de67-4fca-8f6a-9aa18eeb141e\",\"name\":\"test\",\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"key\\\",\\\"type\\\":\\\"integer\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"value\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}"},"clientVersion":"delta-rs.0.22.0"}} \ No newline at end of file diff --git a/tests/data/delta/_delta_log/00000000000000000001.json b/tests/data/delta/_delta_log/00000000000000000001.json new file mode 100644 index 00000000..3b732880 --- /dev/null +++ b/tests/data/delta/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet","partitionValues":{},"size":768,"modificationTime":1732876066644,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":1,\"value\":\"one\"},\"maxValues\":{\"key\":2,\"value\":\"two\"},\"nullCount\":{\"value\":0,\"key\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1732876066645,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.22.0"}} \ No newline at end of file diff --git a/tests/data/delta/_delta_log/00000000000000000002.json b/tests/data/delta/_delta_log/00000000000000000002.json new file mode 100644 index 00000000..f78fbf54 --- /dev/null +++ b/tests/data/delta/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet","partitionValues":{},"size":780,"modificationTime":1732876070309,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":3,\"value\":\"four\"},\"maxValues\":{\"key\":4,\"value\":\"three\"},\"nullCount\":{\"key\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1732876070311,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.22.0"}} \ No newline at end of file diff --git a/tests/data/delta/part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet b/tests/data/delta/part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet new file mode 100644 index 00000000..52de9990 Binary files /dev/null and b/tests/data/delta/part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet differ diff --git a/tests/data/delta/part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet b/tests/data/delta/part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet new file mode 100644 index 00000000..3728351b Binary files /dev/null and b/tests/data/delta/part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet differ diff --git a/tests/fixtures.rs b/tests/fixtures.rs index 9ce5aa7d..013d7eb3 100644 --- a/tests/fixtures.rs +++ b/tests/fixtures.rs @@ -1,4 +1,6 @@ -use clade::schema::{ListSchemaResponse, SchemaObject, StorageLocation, TableObject}; +use clade::schema::{ + ListSchemaResponse, SchemaObject, StorageLocation, TableFormat, TableObject, +}; use object_store::aws::AmazonS3ConfigKey; use object_store::gcp::GoogleConfigKey; use object_store::ClientConfigKey; @@ -25,15 +27,17 @@ pub fn fake_gcs_creds() -> String { pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { let mut local_schema_tables = vec![TableObject { name: "file_with_store".to_string(), - path: "delta-0.8.0-partitioned".to_string(), + path: "delta".to_string(), store: Some("local_fs".to_string()), + format: TableFormat::Delta.into(), }]; if include_file_without_store { local_schema_tables.push(TableObject { name: "file".to_string(), - path: "delta-0.8.0-partitioned".to_string(), + path: "delta".to_string(), store: None, + format: TableFormat::Delta.into(), }) } @@ -48,13 +52,21 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { tables: vec![ TableObject { name: "minio".to_string(), - path: "test-data/delta-0.8.0-partitioned".to_string(), + path: "test-data/delta".to_string(), store: Some("minio".to_string()), + format: TableFormat::Delta.into(), }, TableObject { name: "minio_prefix".to_string(), - path: "delta-0.8.0-partitioned".to_string(), + path: "delta".to_string(), store: Some("minio-prefix".to_string()), + format: TableFormat::Delta.into(), + }, + TableObject { + name: "iceberg".to_string(), + path: "iceberg/default.db/iceberg_table/metadata/00001-f394d7ec-944b-432d-a44f-78b5ec95aae2.metadata.json".to_string(), + store: Some("minio-prefix".to_string()), + format: TableFormat::Iceberg.into(), }, ], }, @@ -62,8 +74,9 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { name: "gcs".to_string(), tables: vec![TableObject { name: "fake".to_string(), - path: "delta-0.8.0-partitioned".to_string(), + path: "delta".to_string(), store: Some("fake-gcs".to_string()), + format: TableFormat::Delta.into(), }], }, ], diff --git a/tests/flight/inline_metastore.rs b/tests/flight/inline_metastore.rs index fe0821d9..f6c69733 100644 --- a/tests/flight/inline_metastore.rs +++ b/tests/flight/inline_metastore.rs @@ -22,6 +22,7 @@ use crate::flight::*; #[case("local.file_with_store", TestServerType::InlineOnly, false)] #[case("s3.minio", TestServerType::InlineOnly, false)] #[case("s3.minio_prefix", TestServerType::InlineOnly, false)] +#[case("s3.iceberg", TestServerType::InlineOnly, false)] #[case("gcs.fake", TestServerType::InlineOnly, false)] #[tokio::test] async fn test_inline_query( @@ -33,24 +34,21 @@ async fn test_inline_query( let batches = get_flight_batches_inlined( &mut client, - format!("SELECT * FROM {table} ORDER BY value"), + format!("SELECT * FROM {table} ORDER BY key"), schemas(include_file_without_store), ) .await .unwrap(); let expected = [ - "+-------+------+-------+-----+", - "| value | year | month | day |", - "+-------+------+-------+-----+", - "| 1 | 2020 | 1 | 1 |", - "| 2 | 2020 | 2 | 3 |", - "| 3 | 2020 | 2 | 5 |", - "| 4 | 2021 | 4 | 5 |", - "| 5 | 2021 | 12 | 4 |", - "| 6 | 2021 | 12 | 20 |", - "| 7 | 2021 | 12 | 20 |", - "+-------+------+-------+-----+", + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "+-----+-------+", ]; assert_batches_eq!(expected, &batches); } diff --git a/tests/statements/query.rs b/tests/statements/query.rs index 67c3ac77..ecadd473 100644 --- a/tests/statements/query.rs +++ b/tests/statements/query.rs @@ -342,30 +342,27 @@ async fn test_delta_tables() { .plan_query( "CREATE EXTERNAL TABLE test_delta \ STORED AS DELTATABLE \ - LOCATION 'tests/data/delta-0.8.0-partitioned'", + LOCATION 'tests/data/delta'", ) .await .unwrap(); // The order gets randomized so we need to enforce it let plan = context - .plan_query("SELECT * FROM staging.test_delta ORDER BY value") + .plan_query("SELECT * FROM staging.test_delta ORDER BY key") .await .unwrap(); let results = context.collect(plan).await.unwrap(); let expected = [ - "+-------+------+-------+-----+", - "| value | year | month | day |", - "+-------+------+-------+-----+", - "| 1 | 2020 | 1 | 1 |", - "| 2 | 2020 | 2 | 3 |", - "| 3 | 2020 | 2 | 5 |", - "| 4 | 2021 | 4 | 5 |", - "| 5 | 2021 | 12 | 4 |", - "| 6 | 2021 | 12 | 20 |", - "| 7 | 2021 | 12 | 20 |", - "+-------+------+-------+-----+", + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "+-----+-------+", ]; assert_batches_eq!(expected, &results); }