From 305c74a11b4b9b54e439a835db80a230ccd60528 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 27 Nov 2024 09:52:57 +0100 Subject: [PATCH 1/3] Add table format parameter to clade, and wire-up to a placeholder Iceberg table instantiation --- clade/proto/schema.proto | 7 ++++ src/catalog/metastore.rs | 69 +++++++++++++++++++++++---------------- src/catalog/repository.rs | 3 +- src/provider.rs | 2 +- tests/fixtures.rs | 9 ++++- 5 files changed, 58 insertions(+), 32 deletions(-) diff --git a/clade/proto/schema.proto b/clade/proto/schema.proto index 5d726731..6b455720 100644 --- a/clade/proto/schema.proto +++ b/clade/proto/schema.proto @@ -13,6 +13,13 @@ message TableObject { string path = 2; // Storage location identifier optional string store = 4; + // Table format + TableFormat format = 5; +} + +enum TableFormat { + DELTA = 0; + ICEBERG = 1; } // A single root storage location, hosting many individual tables diff --git a/src/catalog/metastore.rs b/src/catalog/metastore.rs index 43b4cd01..7fd75b99 100644 --- a/src/catalog/metastore.rs +++ b/src/catalog/metastore.rs @@ -13,7 +13,7 @@ use crate::wasm_udf::data_types::{ CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage, CreateFunctionVolatility, }; -use clade::schema::{SchemaObject, TableObject}; +use clade::schema::{SchemaObject, TableFormat, TableObject}; use dashmap::DashMap; use datafusion::catalog_common::memory::MemorySchemaProvider; use datafusion::datasource::TableProvider; @@ -166,35 +166,46 @@ impl Metastore { // delta tables present in the database. The real fix for this is to make DF use `TableSource` // for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs. - let table_log_store = match table.store { - // Use the provided customized location - Some(name) => { - let (location, this_store_options) = store_options - .get(&name) - .ok_or(CatalogError::Generic { - reason: format!("Object store with name {name} not found"), - })? - .clone(); - - self.object_stores - .get_log_store_for_table( - Url::parse(&location)?, - this_store_options, - table.path, - ) - .await? + match TableFormat::try_from(table.format).map_err(|e| CatalogError::Generic { + reason: format!("Unrecognized table format id {}: {e}", table.format), + })? { + TableFormat::Delta => { + let table_log_store = match table.store { + // Use the provided customized location + Some(name) => { + let (location, this_store_options) = store_options + .get(&name) + .ok_or(CatalogError::Generic { + reason: format!( + "Object store with name {name} not found" + ), + })? + .clone(); + + self.object_stores + .get_log_store_for_table( + Url::parse(&location)?, + this_store_options, + table.path, + ) + .await? + } + // Use the configured, default, object store + None => self + .object_stores + .get_default_log_store(&table.path) + .ok_or(CatalogError::NoTableStoreInInlineMetastore { + name: table.name.clone(), + })?, + }; + + let delta_table = DeltaTable::new(table_log_store, Default::default()); + Ok((Arc::from(table.name), Arc::new(delta_table) as _)) } - // Use the configured, default, object store - None => self - .object_stores - .get_default_log_store(&table.path) - .ok_or(CatalogError::NoTableStoreInInlineMetastore { - name: table.name.clone(), - })?, - }; - - let delta_table = DeltaTable::new(table_log_store, Default::default()); - Ok((Arc::from(table.name), Arc::new(delta_table) as _)) + TableFormat::Iceberg => { + unimplemented!("Iceberg tables are not supported yet"); + } + } } pub async fn build_functions( diff --git a/src/catalog/repository.rs b/src/catalog/repository.rs index e3960048..a70cfb10 100644 --- a/src/catalog/repository.rs +++ b/src/catalog/repository.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use itertools::Itertools; use uuid::Uuid; -use clade::schema::{ListSchemaResponse, SchemaObject, TableObject}; +use clade::schema::{ListSchemaResponse, SchemaObject, TableFormat, TableObject}; use crate::catalog::{ CatalogError, CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore, @@ -133,6 +133,7 @@ impl SchemaStore for RepositoryStore { name: name.clone(), path: uuid.to_string(), store: None, + format: TableFormat::Delta.into(), }) } else { None diff --git a/src/provider.rs b/src/provider.rs index ea26d43f..c75d512d 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -85,7 +85,7 @@ impl SchemaProvider for SeafowlSchema { let mut delta_table = match self.tables.get(name) { None => return Ok(None), Some(table) => match table.as_any().downcast_ref::() { - // This shouldn't happen since we store only DeltaTable's in the map + // Different table format, e.g. Iceberg None => return Ok(Some(table.clone())), Some(delta_table) => { if delta_table.version() != -1 { diff --git a/tests/fixtures.rs b/tests/fixtures.rs index 9ce5aa7d..4215782f 100644 --- a/tests/fixtures.rs +++ b/tests/fixtures.rs @@ -1,4 +1,6 @@ -use clade::schema::{ListSchemaResponse, SchemaObject, StorageLocation, TableObject}; +use clade::schema::{ + ListSchemaResponse, SchemaObject, StorageLocation, TableFormat, TableObject, +}; use object_store::aws::AmazonS3ConfigKey; use object_store::gcp::GoogleConfigKey; use object_store::ClientConfigKey; @@ -27,6 +29,7 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { name: "file_with_store".to_string(), path: "delta-0.8.0-partitioned".to_string(), store: Some("local_fs".to_string()), + format: TableFormat::Delta.into(), }]; if include_file_without_store { @@ -34,6 +37,7 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { name: "file".to_string(), path: "delta-0.8.0-partitioned".to_string(), store: None, + format: TableFormat::Delta.into(), }) } @@ -50,11 +54,13 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { name: "minio".to_string(), path: "test-data/delta-0.8.0-partitioned".to_string(), store: Some("minio".to_string()), + format: TableFormat::Delta.into(), }, TableObject { name: "minio_prefix".to_string(), path: "delta-0.8.0-partitioned".to_string(), store: Some("minio-prefix".to_string()), + format: TableFormat::Delta.into(), }, ], }, @@ -64,6 +70,7 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { name: "fake".to_string(), path: "delta-0.8.0-partitioned".to_string(), store: Some("fake-gcs".to_string()), + format: TableFormat::Delta.into(), }], }, ], From edabccade63d0b00d6c38d8e21e564a3846a72d8 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 29 Nov 2024 11:36:24 +0100 Subject: [PATCH 2/3] Add construction of Iceberg tables in the metastore Also add an Iceberg table to the inline integration test. --- Cargo.lock | 1 + Cargo.toml | 1 + src/catalog/metastore.rs | 48 ++++++++++++++++-- src/catalog/mod.rs | 3 ++ src/object_store/utils.rs | 34 +++++++++++++ tests/clade/query.rs | 21 ++++---- .../_delta_log/00000000000000000000.json | 9 ---- ...-ad78-fd13c2027c7e.c000.snappy.parquet.crc | Bin 12 -> 0 bytes ...4a39-ad78-fd13c2027c7e.c000.snappy.parquet | Bin 414 -> 0 bytes ...-a060-f67ccc63ced9.c000.snappy.parquet.crc | Bin 12 -> 0 bytes ...42cd-a060-f67ccc63ced9.c000.snappy.parquet | Bin 414 -> 0 bytes ...-8ea3-3990b2f027b5.c000.snappy.parquet.crc | Bin 12 -> 0 bytes ...4add-8ea3-3990b2f027b5.c000.snappy.parquet | Bin 414 -> 0 bytes ...-baa0-1c8a2bb98104.c000.snappy.parquet.crc | Bin 12 -> 0 bytes ...4184-baa0-1c8a2bb98104.c000.snappy.parquet | Bin 407 -> 0 bytes ...-b19e-1f92af3fbb25.c000.snappy.parquet.crc | Bin 12 -> 0 bytes ...4d52-b19e-1f92af3fbb25.c000.snappy.parquet | Bin 414 -> 0 bytes ...-a6fc-22b7bc92bebb.c000.snappy.parquet.crc | Bin 12 -> 0 bytes ...4032-a6fc-22b7bc92bebb.c000.snappy.parquet | Bin 414 -> 0 bytes .../_delta_log/00000000000000000000.json | 3 ++ .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.json | 2 + ...4bf1-91d6-020a60069f85-c000.snappy.parquet | Bin 0 -> 780 bytes ...42be-8450-54a5310148ac-c000.snappy.parquet | Bin 0 -> 768 bytes tests/fixtures.rs | 16 ++++-- tests/flight/inline_metastore.rs | 22 ++++---- tests/statements/query.rs | 23 ++++----- 27 files changed, 130 insertions(+), 55 deletions(-) delete mode 100644 tests/data/delta-0.8.0-partitioned/_delta_log/00000000000000000000.json delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/.part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet.crc delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/.part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet.crc delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/.part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet.crc delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=20/.part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet.crc delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/.part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet.crc delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/.part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet.crc delete mode 100644 tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet create mode 100644 tests/data/delta/_delta_log/00000000000000000000.json create mode 100644 tests/data/delta/_delta_log/00000000000000000001.json create mode 100644 tests/data/delta/_delta_log/00000000000000000002.json create mode 100644 tests/data/delta/part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet create mode 100644 tests/data/delta/part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet diff --git a/Cargo.lock b/Cargo.lock index 5afd1e1c..fee6d39c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6648,6 +6648,7 @@ dependencies = [ "deltalake", "futures", "hex", + "iceberg", "iceberg-datafusion", "indexmap 2.6.0", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 122195aa..a96c06c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "fix-deci futures = "0.3" hex = ">=0.4.0" +iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "697a20060f2247da87f73073e8bf5ab407bd40ea" } iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "eeca14d13b23f2a92e57f503c141a860a3407226" } indexmap = "2.6.0" diff --git a/src/catalog/metastore.rs b/src/catalog/metastore.rs index 7fd75b99..0f0f14dd 100644 --- a/src/catalog/metastore.rs +++ b/src/catalog/metastore.rs @@ -2,7 +2,7 @@ use crate::catalog::external::ExternalStore; use crate::catalog::repository::RepositoryStore; use crate::catalog::{ CatalogError, CatalogResult, CatalogStore, CreateFunctionError, FunctionStore, - SchemaStore, TableStore, + SchemaStore, TableStore, DEFAULT_SCHEMA, }; use crate::object_store::factory::ObjectStoreFactory; @@ -18,16 +18,20 @@ use dashmap::DashMap; use datafusion::catalog_common::memory::MemorySchemaProvider; use datafusion::datasource::TableProvider; +use super::empty::EmptyStore; use crate::catalog::memory::MemoryStore; +use crate::object_store::utils::object_store_opts_to_file_io_props; use deltalake::DeltaTable; use futures::{stream, StreamExt, TryStreamExt}; +use iceberg::io::FileIO; +use iceberg::table::StaticTable; +use iceberg::TableIdent; +use iceberg_datafusion::IcebergTableProvider; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use url::Url; -use super::empty::EmptyStore; - // Root URL for a storage location alongside client connection options type LocationAndOptions = (String, HashMap); @@ -203,7 +207,43 @@ impl Metastore { Ok((Arc::from(table.name), Arc::new(delta_table) as _)) } TableFormat::Iceberg => { - unimplemented!("Iceberg tables are not supported yet"); + let (location, file_io) = match table.store { + Some(name) => { + let (location, this_store_options) = store_options + .get(&name) + .ok_or(CatalogError::Generic { + reason: format!( + "Object store with name {name} not found" + ), + })? + .clone(); + + let file_io_props = object_store_opts_to_file_io_props(&this_store_options); + let file_io = FileIO::from_path(&location)?.with_props(file_io_props).build()?; + (location, file_io) + } + None => return Err(CatalogError::Generic { + reason: "Iceberg tables must pass FileIO props as object store options".to_string(), + }), + }; + + // Create the full path to table metadata by combining the object store location and + // relative table metadata path + let absolute_path = format!( + "{}/{}", + location.trim_end_matches("/"), + table.path.trim_start_matches("/") + ); + let iceberg_table = StaticTable::from_metadata_file( + &absolute_path, + TableIdent::from_strs(vec![DEFAULT_SCHEMA, &table.name])?, + file_io, + ) + .await? + .into_table(); + let table_provider = + IcebergTableProvider::try_new_from_table(iceberg_table).await?; + Ok((Arc::from(table.name), Arc::new(table_provider) as _)) } } } diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 0e8f0707..b1b22e1d 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -91,6 +91,9 @@ pub enum CatalogError { #[error("No inline metastore passed in")] NoInlineMetastore, + + #[error("Failed constructing an Iceberg table: {0}")] + IcebergError(#[from] iceberg::Error), } /// Implement a global converter into a DataFusionError from the catalog error type. diff --git a/src/object_store/utils.rs b/src/object_store/utils.rs index c7b932a7..49552a8b 100644 --- a/src/object_store/utils.rs +++ b/src/object_store/utils.rs @@ -1,6 +1,10 @@ use futures::TryFutureExt; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use object_store::aws::AmazonS3ConfigKey; use object_store::Error; +use std::collections::HashMap; use std::path::Path as StdPath; +use std::str::FromStr; use tokio::fs::{copy, create_dir_all, remove_file, rename}; use tracing::debug; @@ -44,3 +48,33 @@ pub async fn fast_upload(from: &StdPath, to: String) -> object_store::Result<(), Ok(()) } } + +// Go through all known keys for object store and convert them to corresponding file_io ones. +// +// For now only converts S3 keys. +// TODO: At some point this should be redundant, since there is an OpenDAL adapter for object_store, +// https://github.com/apache/iceberg-rust/issues/172 +pub fn object_store_opts_to_file_io_props( + opts: &HashMap, +) -> HashMap { + let mut props = HashMap::new(); + + for (key, val) in opts.iter() { + let key = match AmazonS3ConfigKey::from_str(key) { + Ok(AmazonS3ConfigKey::AccessKeyId) => S3_ACCESS_KEY_ID, + Ok(AmazonS3ConfigKey::SecretAccessKey) => S3_SECRET_ACCESS_KEY, + Ok(AmazonS3ConfigKey::Region) => S3_REGION, + Ok(AmazonS3ConfigKey::Endpoint) => S3_ENDPOINT, + _ => key, // for now just propagate any non-matched keys + }; + + props.insert(key.to_string(), val.clone()); + } + + // FileIO requires the region prop even when the S3 store doesn't (e.g. MinIO) + props + .entry(S3_REGION.to_string()) + .or_insert("us-east-1".to_string()); + + props +} diff --git a/tests/clade/query.rs b/tests/clade/query.rs index 017b4699..6ab7e2d4 100644 --- a/tests/clade/query.rs +++ b/tests/clade/query.rs @@ -18,23 +18,20 @@ async fn test_basic_select(#[case] table: &str, #[case] object_store: bool) -> ( let _r = context.metastore.schemas.list(DEFAULT_DB).await; let plan = context - .plan_query(&format!("SELECT * FROM {table} ORDER BY value")) + .plan_query(&format!("SELECT * FROM {table} ORDER BY key")) .await .unwrap(); let results = context.collect(plan).await.unwrap(); let expected = [ - "+-------+------+-------+-----+", - "| value | year | month | day |", - "+-------+------+-------+-----+", - "| 1 | 2020 | 1 | 1 |", - "| 2 | 2020 | 2 | 3 |", - "| 3 | 2020 | 2 | 5 |", - "| 4 | 2021 | 4 | 5 |", - "| 5 | 2021 | 12 | 4 |", - "| 6 | 2021 | 12 | 20 |", - "| 7 | 2021 | 12 | 20 |", - "+-------+------+-------+-----+", + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "+-----+-------+", ]; assert_batches_eq!(expected, &results); } diff --git a/tests/data/delta-0.8.0-partitioned/_delta_log/00000000000000000000.json b/tests/data/delta-0.8.0-partitioned/_delta_log/00000000000000000000.json deleted file mode 100644 index 2fa0736f..00000000 --- a/tests/data/delta-0.8.0-partitioned/_delta_log/00000000000000000000.json +++ /dev/null @@ -1,9 +0,0 @@ -{"commitInfo":{"timestamp":1615555646188,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"year\",\"month\",\"day\"]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputBytes":"2477","numOutputRows":"7"}}} -{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} -{"metaData":{"id":"fe5a3c11-30d4-4dd7-b115-a1c121e66a4e","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"year\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"day\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["year","month","day"],"configuration":{},"createdTime":1615555644515}} -{"add":{"path":"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet","partitionValues":{"year":"2020","month":"1","day":"1"},"size":414,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet","partitionValues":{"year":"2020","month":"2","day":"3"},"size":414,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet","partitionValues":{"year":"2020","month":"2","day":"5"},"size":414,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet","partitionValues":{"year":"2021","month":"12","day":"20"},"size":407,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet","partitionValues":{"year":"2021","month":"12","day":"4"},"size":414,"modificationTime":1615555646000,"dataChange":true}} -{"add":{"path":"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet","partitionValues":{"year":"2021","month":"4","day":"5"},"size":414,"modificationTime":1615555646000,"dataChange":true}} diff --git a/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/.part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/.part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet.crc deleted file mode 100644 index 65441ce99881a7835f8623bf486a21bd8d9b3798..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12 TcmYc;N@ieSU}C5{e(@0i6j1~- diff --git a/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet b/tests/data/delta-0.8.0-partitioned/year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet deleted file mode 100644 index 0aea93ebd33e5482a4b262132d84d5d4cd88ddb9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 414 zcmZWm-)q7!5RMs2VZNp$BM&8@D0HD+v0B;7-o~C}Pr^1vYEoC0R_zbR5dZhx#4*?) zT<-Y3yYJ&}arYP!L|k&^6AVd@IGlDucrF^9euU8F2t&N+bRdQVGylF{Z&mgFTCd(r zp$JB#KftuRSGF|+gj~!x<3op&KAZHA!_E-FE_Rph^U%5dqro_6BNznDOBW;NQ@W|v zQf(DT$dj7fo2t$?CGAEskRhVue~fx(B^cYarjga=Ll&k{S~ZG39YpiFHDWI6rfOG> z@I{szt7|a_s8q!_bg~b~FEcOKP?xQ>Dz!j?UgL|OsbYRO9$$`#KAbhLi^@Bj^gusxXaCb)(tSK{lUTBj>Vis&>^# zPiDEXX)WgfmAd$bPWA!$W#;7?>aw*~Cl)BsYdrZgRn8B`5h^%x2daVahlDtI1VRCWU?p z@I{szt7|a_s8q!_bg~b~FEcOKP?xQ>Dz!j?UgL|OsbYRO9$$`#KAbhLi^@Bj^g4qH6kKDJ96a>0VHa`;OBIE5NgCW3@zPuANqX?m(w2?c*w7?uJ}gD@-}@BX z9(oyenRzqAd&|2=4Yxszbd&pG^{$|~I!q~~*&dx%W_DXM+Oq0+ulH7~cHJXf`hWi83`k=0*2#pbN4 z%Vu3Ohb5`X;)>0IDrA15&c%WGQ>MiR{za2zGRi;#v&vyVu55m6+m{xRhfC&7UfP$E zw#PjvX^*2O&0<^5!Z?{FVd&pRLM1}@e(VZYglg)p!zc&?H&RjTO{itT4qjo$OE}3l DUfyOo diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/.part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/.part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet.crc deleted file mode 100644 index 6e673394e58dcc02573fabd6bc1cb85b1560c951..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12 TcmYc;N@ieSU}8v6NZkMc5ZVIJ diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet b/tests/data/delta-0.8.0-partitioned/year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet deleted file mode 100644 index 24889e45b35606bf6aedad5b408335f8aae362bf..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 414 zcmZWm-%GW`ox}>w1c3Xc;WbA+U91}qy zT<-Y3yYJ&}arYPyL~L^95e&(ISPtz)@N6_W{RpAU35Iyl>p_eNX5Rg<*~ph}kC(8)d^zs#aqLtS;o$jkr*dbP)Yrt1gm$iE~=>{SI zlkOzz3Zq;(PgFk7lO(*!Vx7lv7%D&Z<3vaPD#>Qa%+GYDf+=$x*nxsgFW@p?Y42vo diff --git a/tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/.part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet.crc b/tests/data/delta-0.8.0-partitioned/year=2021/month=4/day=5/.part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet.crc deleted file mode 100644 index 8788a17c2ae6f3d403c7b2edc33d3da64801d39e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12 TcmYc;N@ieSU}ES}{RpAU5r%lt=|G$i%-#EbwNvH$Yqfqe zxgr>m{s6P?L1m2@AVe{@xjnRO(&y72viKPym}0u3&qM3>j|R6v8^IuGUQvv=%hLB)n&7; zg)5TOWV#XyfC`m=Lnr%y{4&#G19j14nTj)@K(BGd&s4EEw(U!c*uz=#Ixn5GNynEy zm~=*QlV;jc^GGN2B#MHYI5bHZ27&fMFO1C8TSxIMnt8E_wLj*z4Li`V=_Op|3tj$Z Az5oCK diff --git a/tests/data/delta/_delta_log/00000000000000000000.json b/tests/data/delta/_delta_log/00000000000000000000.json new file mode 100644 index 00000000..451b4a7f --- /dev/null +++ b/tests/data/delta/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"b4d25be5-de67-4fca-8f6a-9aa18eeb141e","name":"test","description":"Created by Seafowl 0.5.8","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"key\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1732876066617,"configuration":{}}} +{"commitInfo":{"timestamp":1732876066618,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","mode":"ErrorIfExists","location":"file:///Users/gruuya/Splitgraph/seafowl-data/2f6abe4f-c07e-43fa-a98d-6200a34380dd","metadata":"{\"configuration\":{},\"createdTime\":1732876066617,\"description\":\"Created by Seafowl 0.5.8\",\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"b4d25be5-de67-4fca-8f6a-9aa18eeb141e\",\"name\":\"test\",\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"key\\\",\\\"type\\\":\\\"integer\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"value\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}"},"clientVersion":"delta-rs.0.22.0"}} \ No newline at end of file diff --git a/tests/data/delta/_delta_log/00000000000000000001.json b/tests/data/delta/_delta_log/00000000000000000001.json new file mode 100644 index 00000000..3b732880 --- /dev/null +++ b/tests/data/delta/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet","partitionValues":{},"size":768,"modificationTime":1732876066644,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":1,\"value\":\"one\"},\"maxValues\":{\"key\":2,\"value\":\"two\"},\"nullCount\":{\"value\":0,\"key\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1732876066645,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.22.0"}} \ No newline at end of file diff --git a/tests/data/delta/_delta_log/00000000000000000002.json b/tests/data/delta/_delta_log/00000000000000000002.json new file mode 100644 index 00000000..f78fbf54 --- /dev/null +++ b/tests/data/delta/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet","partitionValues":{},"size":780,"modificationTime":1732876070309,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"key\":3,\"value\":\"four\"},\"maxValues\":{\"key\":4,\"value\":\"three\"},\"nullCount\":{\"key\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1732876070311,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.22.0"}} \ No newline at end of file diff --git a/tests/data/delta/part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet b/tests/data/delta/part-00000-05b2b8d1-53c2-4bf1-91d6-020a60069f85-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..52de99901bc7e0eee889cd42ff3a95ea0aafb3e9 GIT binary patch literal 780 zcmZ`&J#Q015S`mwTPd!Ht?cMlx+03mDUnRVSF8jM z3L5?a1r-$~4IRIPnlj=aAeh;8KtwRo?d-gH^JaEe*QEAEqs^>YXt0Aaz|!Z{A5bHJ z5CAlP229zUt2kEC6q70}X~APHj8cggUsDHA9Oxhj6|)!2mA^IL0u(qu>g8D`0#63x z{2$^4M~Jng|D$e8%KG-~+>FX1ET=rLPf4wfy^{HpOK`_LLvC#oJXWU9TZ?62#@;%^Opk=*~SBUEh~@TNbg_-l@I>Qt$bK)*kjA*Y5X)#^9bLx4YC79$S6d z-|4xgg0|`h;&`6h!rPkod4A`;yc~t-=BS^w=sLvqWhZ|%&PM0+;rvmS51$TR%&%Qu UxwLY@R`@C9HGc*GP5uvl0g|hhPyhe` literal 0 HcmV?d00001 diff --git a/tests/data/delta/part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet b/tests/data/delta/part-00000-3d8342be-6309-42be-8450-54a5310148ac-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3728351bcfb570eea88764ca5ff17c31dc3c3f3b GIT binary patch literal 768 zcmZ`%y>1gh5T3m|tQ1$oQD$^2Z4n`I3X*XkC>BAam?Q)n4A>R{kp`LQZ~!Mx&Q2_P zN(w5Tfiey6z#~vl@BlnP1QIj14u67?ZfED4Z@!t`y%u+_V1(0nz9|l702V$heup{( z7yt<5L%PF+iKa@j( zlTaxC(Y58k`o`jl=>kh|F@*?1iFE8hj>OOEC!JMm_H4`C$BH zxU-w>bukifK1jls*(<YNmr!EnhDHLN zd5Ltpc5{L=-+LgU*B)!Up^I93>r7t*t@nII>)X3e>ks=%6L3qD8$E6*PpmQNU+5K0 z6>T?m)X6+|mA5|j^Zd+vc|8jAtx-Q~^C~3vpqn2aWur^^aPBzEhtCK5bIVt&OV!J^ QBJZeA`Az=ami+ra011_mmjD0& literal 0 HcmV?d00001 diff --git a/tests/fixtures.rs b/tests/fixtures.rs index 4215782f..013d7eb3 100644 --- a/tests/fixtures.rs +++ b/tests/fixtures.rs @@ -27,7 +27,7 @@ pub fn fake_gcs_creds() -> String { pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { let mut local_schema_tables = vec![TableObject { name: "file_with_store".to_string(), - path: "delta-0.8.0-partitioned".to_string(), + path: "delta".to_string(), store: Some("local_fs".to_string()), format: TableFormat::Delta.into(), }]; @@ -35,7 +35,7 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { if include_file_without_store { local_schema_tables.push(TableObject { name: "file".to_string(), - path: "delta-0.8.0-partitioned".to_string(), + path: "delta".to_string(), store: None, format: TableFormat::Delta.into(), }) @@ -52,23 +52,29 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { tables: vec![ TableObject { name: "minio".to_string(), - path: "test-data/delta-0.8.0-partitioned".to_string(), + path: "test-data/delta".to_string(), store: Some("minio".to_string()), format: TableFormat::Delta.into(), }, TableObject { name: "minio_prefix".to_string(), - path: "delta-0.8.0-partitioned".to_string(), + path: "delta".to_string(), store: Some("minio-prefix".to_string()), format: TableFormat::Delta.into(), }, + TableObject { + name: "iceberg".to_string(), + path: "iceberg/default.db/iceberg_table/metadata/00001-f394d7ec-944b-432d-a44f-78b5ec95aae2.metadata.json".to_string(), + store: Some("minio-prefix".to_string()), + format: TableFormat::Iceberg.into(), + }, ], }, SchemaObject { name: "gcs".to_string(), tables: vec![TableObject { name: "fake".to_string(), - path: "delta-0.8.0-partitioned".to_string(), + path: "delta".to_string(), store: Some("fake-gcs".to_string()), format: TableFormat::Delta.into(), }], diff --git a/tests/flight/inline_metastore.rs b/tests/flight/inline_metastore.rs index fe0821d9..f6c69733 100644 --- a/tests/flight/inline_metastore.rs +++ b/tests/flight/inline_metastore.rs @@ -22,6 +22,7 @@ use crate::flight::*; #[case("local.file_with_store", TestServerType::InlineOnly, false)] #[case("s3.minio", TestServerType::InlineOnly, false)] #[case("s3.minio_prefix", TestServerType::InlineOnly, false)] +#[case("s3.iceberg", TestServerType::InlineOnly, false)] #[case("gcs.fake", TestServerType::InlineOnly, false)] #[tokio::test] async fn test_inline_query( @@ -33,24 +34,21 @@ async fn test_inline_query( let batches = get_flight_batches_inlined( &mut client, - format!("SELECT * FROM {table} ORDER BY value"), + format!("SELECT * FROM {table} ORDER BY key"), schemas(include_file_without_store), ) .await .unwrap(); let expected = [ - "+-------+------+-------+-----+", - "| value | year | month | day |", - "+-------+------+-------+-----+", - "| 1 | 2020 | 1 | 1 |", - "| 2 | 2020 | 2 | 3 |", - "| 3 | 2020 | 2 | 5 |", - "| 4 | 2021 | 4 | 5 |", - "| 5 | 2021 | 12 | 4 |", - "| 6 | 2021 | 12 | 20 |", - "| 7 | 2021 | 12 | 20 |", - "+-------+------+-------+-----+", + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "+-----+-------+", ]; assert_batches_eq!(expected, &batches); } diff --git a/tests/statements/query.rs b/tests/statements/query.rs index 67c3ac77..ecadd473 100644 --- a/tests/statements/query.rs +++ b/tests/statements/query.rs @@ -342,30 +342,27 @@ async fn test_delta_tables() { .plan_query( "CREATE EXTERNAL TABLE test_delta \ STORED AS DELTATABLE \ - LOCATION 'tests/data/delta-0.8.0-partitioned'", + LOCATION 'tests/data/delta'", ) .await .unwrap(); // The order gets randomized so we need to enforce it let plan = context - .plan_query("SELECT * FROM staging.test_delta ORDER BY value") + .plan_query("SELECT * FROM staging.test_delta ORDER BY key") .await .unwrap(); let results = context.collect(plan).await.unwrap(); let expected = [ - "+-------+------+-------+-----+", - "| value | year | month | day |", - "+-------+------+-------+-----+", - "| 1 | 2020 | 1 | 1 |", - "| 2 | 2020 | 2 | 3 |", - "| 3 | 2020 | 2 | 5 |", - "| 4 | 2021 | 4 | 5 |", - "| 5 | 2021 | 12 | 4 |", - "| 6 | 2021 | 12 | 20 |", - "| 7 | 2021 | 12 | 20 |", - "+-------+------+-------+-----+", + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "+-----+-------+", ]; assert_batches_eq!(expected, &results); } From 3cd662654ea3b296a5798b242ba831339bfb667c Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 29 Nov 2024 12:11:01 +0100 Subject: [PATCH 3/3] Use a dummy region name whne adding one to FileIO props for stores which don't need one --- Cargo.toml | 2 +- src/object_store/utils.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a96c06c7..562fb4c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,8 +100,8 @@ deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "fix-deci futures = "0.3" hex = ">=0.4.0" -iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "697a20060f2247da87f73073e8bf5ab407bd40ea" } +iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "eeca14d13b23f2a92e57f503c141a860a3407226" } iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "eeca14d13b23f2a92e57f503c141a860a3407226" } indexmap = "2.6.0" itertools = { workspace = true } diff --git a/src/object_store/utils.rs b/src/object_store/utils.rs index 49552a8b..ab232610 100644 --- a/src/object_store/utils.rs +++ b/src/object_store/utils.rs @@ -74,7 +74,7 @@ pub fn object_store_opts_to_file_io_props( // FileIO requires the region prop even when the S3 store doesn't (e.g. MinIO) props .entry(S3_REGION.to_string()) - .or_insert("us-east-1".to_string()); + .or_insert("dummy-region".to_string()); props }