Skip to content

Commit

Permalink
Merge pull request #744 from splitgraph/iceberg-external-tables
Browse files Browse the repository at this point in the history
Wire up Iceberg external tables
  • Loading branch information
gruuya authored Nov 25, 2024
2 parents e019ffc + b18c772 commit e358a53
Show file tree
Hide file tree
Showing 10 changed files with 461 additions and 11 deletions.
412 changes: 407 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "fix-deci

futures = "0.3"
hex = ">=0.4.0"

iceberg-datafusion = { git = "https://github.com/apache/iceberg-rust", rev = "697a20060f2247da87f73073e8bf5ab407bd40ea" }
indexmap = "2.6.0"
itertools = { workspace = true }

lazy_static = ">=1.4.0"
metrics = { version = "0.23.0" }
metrics-exporter-prometheus = { version = "0.15.3" }
Expand Down
15 changes: 9 additions & 6 deletions src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use datafusion::{
};
use deltalake::delta_datafusion::DeltaTableFactory;
use deltalake::storage::factories;
use iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory;
use metrics::describe_counter;
use metrics_exporter_prometheus::PrometheusBuilder;

Expand Down Expand Up @@ -103,14 +104,16 @@ pub fn build_state_with_table_factories(
.with_default_features()
.build();

state
.table_factories_mut()
.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
let table_factories = state.table_factories_mut();

table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
table_factories.insert(
"ICEBERG".to_string(),
Arc::new(IcebergTableProviderFactory {}),
);
#[cfg(feature = "remote-tables")]
{
state
.table_factories_mut()
.insert("TABLE".to_string(), Arc::new(RemoteTableFactory {}));
table_factories.insert("TABLE".to_string(), Arc::new(RemoteTableFactory {}));
}
state
}
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"s3://seafowl-test-bucket/test-data/iceberg/default.db/iceberg_table","table-uuid":"16ebf585-ea7d-407a-b273-1c6d3ccb3375","last-updated-ms":1732524826682,"last-column-id":2,"schemas":[{"type":"struct","fields":[{"id":1,"name":"key","type":"int","required":true},{"id":2,"name":"value","type":"string","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":999,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"location":"s3://seafowl-test-bucket/test-data/iceberg/default.db/iceberg_table","table-uuid":"16ebf585-ea7d-407a-b273-1c6d3ccb3375","last-updated-ms":1732524832199,"last-column-id":2,"schemas":[{"type":"struct","fields":[{"id":1,"name":"key","type":"int","required":true},{"id":2,"name":"value","type":"string","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":999,"properties":{},"current-snapshot-id":1285208930498918146,"snapshots":[{"snapshot-id":1285208930498918146,"sequence-number":1,"timestamp-ms":1732524832199,"manifest-list":"s3://seafowl-test-bucket/test-data/iceberg/default.db/iceberg_table/metadata/snap-1285208930498918146-0-a992f3e7-7f29-497a-976f-3456cf41ee20.avro","summary":{"operation":"append","added-files-size":"1037","added-data-files":"1","added-records":"4","total-data-files":"1","total-delete-files":"0","total-records":"4","total-files-size":"1037","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":1285208930498918146,"timestamp-ms":1732524832199}],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":1285208930498918146,"type":"branch"}},"format-version":2,"last-sequence-number":1}
Binary file not shown.
Binary file not shown.
Binary file added tests/data/iceberg/iceberg_catalog.db
Binary file not shown.
40 changes: 40 additions & 0 deletions tests/statements/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,43 @@ async fn test_delta_tables() {
];
assert_batches_eq!(expected, &results);
}

#[tokio::test]
async fn test_iceberg_tables() {
let (context, _) = make_context_with_pg(ObjectStoreType::InMemory).await;

context
.plan_query(
"CREATE EXTERNAL TABLE test_iceberg \
STORED AS ICEBERG \
LOCATION 's3://seafowl-test-bucket/test-data/iceberg/default.db/iceberg_table/metadata/00001-f394d7ec-944b-432d-a44f-78b5ec95aae2.metadata.json' \
OPTIONS (\
's3.access-key-id' 'minioadmin', \
's3.secret-access-key' 'minioadmin', \
's3.endpoint' 'http://127.0.0.1:9000', \
'allow_http' 'true', \
's3.region' 'us-east-1'\
)",
)
.await
.unwrap();

// The order gets randomized so we need to enforce it
let plan = context
.plan_query("SELECT * FROM staging.test_iceberg ORDER BY key")
.await
.unwrap();
let results = context.collect(plan).await.unwrap();

let expected = [
"+-----+-------+",
"| key | value |",
"+-----+-------+",
"| 1 | one |",
"| 2 | two |",
"| 3 | three |",
"| 4 | four |",
"+-----+-------+",
];
assert_batches_eq!(expected, &results);
}

0 comments on commit e358a53

Please sign in to comment.