Skip to content

Commit

Permalink
Merge pull request #32 from JanKaul/create-table
Browse files Browse the repository at this point in the history
Create table catalog struct
  • Loading branch information
JanKaul authored Jun 3, 2024
2 parents a51911a + ea3e1e0 commit 89502fd
Show file tree
Hide file tree
Showing 32 changed files with 872 additions and 652 deletions.
30 changes: 14 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ It provides an Iceberg integration for the [Datafusion](https://arrow.apache.org

### Catalogs

- REST
- RDBMS (Postgres, MySQL)

### File formats
Expand All @@ -52,11 +53,11 @@ use datafusion_iceberg::DataFusionTable;
use iceberg_rust::{
catalog::Catalog,
spec::{
partition::{PartitionField, PartitionSpecBuilder, Transform},
partition::{PartitionField, PartitionSpec, Transform},
schema::Schema,
types::{PrimitiveType, StructField, StructType, Type},
},
table::table_builder::TableBuilder,
table::Table,
};
use iceberg_sql_catalog::SqlCatalog;
use object_store::memory::InMemory;
Expand All @@ -75,7 +76,6 @@ pub(crate) async fn main() {
);

let schema = Schema::builder()
.with_schema_id(1)
.with_fields(
StructType::builder()
.with_struct_field(StructField {
Expand Down Expand Up @@ -119,23 +119,21 @@ pub(crate) async fn main() {
.build()
.unwrap();

let partition_spec = PartitionSpecBuilder::default()
.with_spec_id(1)
let partition_spec = PartitionSpec::builder()
.with_partition_field(PartitionField::new(4, 1000, "day", Transform::Day))
.build()
.expect("Failed to create partition spec");

let mut builder =
TableBuilder::new("test.orders", catalog).expect("Failed to create table builder");
builder
.location("/test/orders")
.with_schema((1, schema))
.current_schema_id(1)
.with_partition_spec((1, partition_spec))
.default_spec_id(1);
let table = Arc::new(DataFusionTable::from(
builder.build().await.expect("Failed to create table."),
));
let table = Table::builder()
.with_name("orders")
.with_location("/test/orders")
.with_schema(schema)
.with_partition_spec(partition_spec)
.build(&["test".to_owned()], catalog)
.await
.expect("Failed to create table");

let table = Arc::new(DataFusionTable::from(table));

let ctx = SessionContext::new();

Expand Down
9 changes: 1 addition & 8 deletions datafusion_iceberg/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use datafusion_expr::{col, min};
use datafusion_iceberg::DataFusionTable;
use iceberg_rust::catalog::identifier::Identifier;
use iceberg_rust::catalog::Catalog;
use iceberg_rust::spec::table_metadata::TableMetadata;
use iceberg_sql_catalog::SqlCatalog;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
Expand All @@ -24,13 +23,7 @@ pub(crate) async fn main() {
);
let identifier = Identifier::parse("test.table1").unwrap();

let metadata: TableMetadata= serde_json::from_slice(&object_store.get(&"/home/iceberg/warehouse/nyc/taxis/metadata/fb072c92-a02b-11e9-ae9c-1bb7bc9eca94.metadata.json".into()).await.unwrap().bytes().await.unwrap()).unwrap();

let table = catalog
.clone()
.create_table(identifier.clone(), metadata)
.await
.expect("Failed to register table.");
let table = catalog.clone().register_table(identifier.clone(), "/home/iceberg/warehouse/nyc/taxis/metadata/fb072c92-a02b-11e9-ae9c-1bb7bc9eca94.metadata.json").await.expect("Failed to register table.");

let ctx = SessionContext::new();

Expand Down
29 changes: 13 additions & 16 deletions datafusion_iceberg/examples/insert_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use datafusion_iceberg::DataFusionTable;
use iceberg_rust::{
catalog::Catalog,
spec::{
partition::{PartitionField, PartitionSpecBuilder, Transform},
partition::{PartitionField, PartitionSpec, Transform},
schema::Schema,
types::{PrimitiveType, StructField, StructType, Type},
},
table::table_builder::TableBuilder,
table::Table,
};
use iceberg_sql_catalog::SqlCatalog;
use object_store::memory::InMemory;
Expand All @@ -26,7 +26,6 @@ pub(crate) async fn main() {
);

let schema = Schema::builder()
.with_schema_id(1)
.with_fields(
StructType::builder()
.with_struct_field(StructField {
Expand Down Expand Up @@ -70,23 +69,21 @@ pub(crate) async fn main() {
.build()
.unwrap();

let partition_spec = PartitionSpecBuilder::default()
.with_spec_id(1)
let partition_spec = PartitionSpec::builder()
.with_partition_field(PartitionField::new(4, 1000, "day", Transform::Day))
.build()
.expect("Failed to create partition spec");

let mut builder =
TableBuilder::new("test.orders", catalog).expect("Failed to create table builder");
builder
.location("/test/orders")
.with_schema((1, schema))
.current_schema_id(1)
.with_partition_spec((1, partition_spec))
.default_spec_id(1);
let table = Arc::new(DataFusionTable::from(
builder.build().await.expect("Failed to create table."),
));
let table = Table::builder()
.with_name("orders")
.with_location("/test/orders")
.with_schema(schema)
.with_partition_spec(partition_spec)
.build(&["test".to_owned()], catalog)
.await
.expect("Failed to create table");

let table = Arc::new(DataFusionTable::from(table));

let ctx = SessionContext::new();

Expand Down
95 changes: 49 additions & 46 deletions datafusion_iceberg/examples/refresh_materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use datafusion::{arrow::array::Int64Array, prelude::SessionContext};
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
use datafusion_iceberg::materialized_view::refresh_materialized_view;
use iceberg_rust::catalog::CatalogList;
use iceberg_rust::materialized_view::materialized_view_builder::MaterializedViewBuilder;
use iceberg_rust::{
spec::{
partition::{PartitionField, PartitionSpecBuilder, Transform},
schema::Schema,
types::{PrimitiveType, StructField, StructType, Type},
},
table::table_builder::TableBuilder,
use iceberg_rust::materialized_view::MaterializedView;
use iceberg_rust::spec::partition::PartitionSpec;
use iceberg_rust::spec::view_metadata::{Version, ViewRepresentation};
use iceberg_rust::spec::{
partition::{PartitionField, Transform},
schema::Schema,
types::{PrimitiveType, StructField, StructType, Type},
};
use iceberg_rust::table::Table;
use iceberg_sql_catalog::SqlCatalogList;
use object_store::memory::InMemory;
use object_store::ObjectStore;
Expand All @@ -29,7 +29,6 @@ pub(crate) async fn main() {
let catalog = catalog_list.catalog("iceberg").await.unwrap();

let schema = Schema::builder()
.with_schema_id(1)
.with_fields(
StructType::builder()
.with_struct_field(StructField {
Expand Down Expand Up @@ -73,25 +72,21 @@ pub(crate) async fn main() {
.build()
.unwrap();

let partition_spec = PartitionSpecBuilder::default()
.with_spec_id(1)
let partition_spec = PartitionSpec::builder()
.with_partition_field(PartitionField::new(4, 1000, "day", Transform::Day))
.build()
.expect("Failed to create partition spec");

let mut builder =
TableBuilder::new("test.orders", catalog.clone()).expect("Failed to create table builder");
builder
.location("/test/orders")
.with_schema((1, schema.clone()))
.current_schema_id(1)
.with_partition_spec((1, partition_spec))
.default_spec_id(1);

builder.build().await.expect("Failed to create table.");
Table::builder()
.with_name("orders")
.with_location("/test/orders")
.with_schema(schema)
.with_partition_spec(partition_spec)
.build(&["test".to_owned()], catalog.clone())
.await
.expect("Failed to create table");

let matview_schema = Schema::builder()
.with_schema_id(1)
.with_fields(
StructType::builder()
.with_struct_field(StructField {
Expand All @@ -114,20 +109,24 @@ pub(crate) async fn main() {
.build()
.unwrap();

let mut builder = MaterializedViewBuilder::new(
"select product_id, amount from iceberg.test.orders where product_id < 3;",
"test.orders_view",
matview_schema,
catalog.clone(),
)
.expect("Failed to create filesystem view builder.");
builder.location("test/orders_view");
let mut matview = builder
.build()
let mut matview = MaterializedView::builder()
.with_name("orders_view")
.with_location("test/orders_view")
.with_schema(matview_schema)
.with_view_version(
Version::builder()
.with_representation(ViewRepresentation::sql(
"select product_id, amount from iceberg.test.orders where product_id < 3;",
None,
))
.build()
.unwrap(),
)
.build(&["test".to_owned()], catalog.clone())
.await
.expect("Failed to create filesystem view");
.expect("Failed to create materialized view");

let total_matview_schema = Schema::builder()
.with_schema_id(1)
.with_fields(
StructType::builder()
.with_struct_field(StructField {
Expand All @@ -150,18 +149,22 @@ pub(crate) async fn main() {
.build()
.unwrap();

let mut total_builder = MaterializedViewBuilder::new(
"select product_id, sum(amount) from iceberg.test.orders_view group by product_id;",
"test.total_orders",
total_matview_schema,
catalog.clone(),
)
.expect("Failed to create filesystem view builder.");
total_builder.location("test/total_orders");
let mut total_matview = total_builder
.build()
.await
.expect("Failed to create filesystem view");
let mut total_matview = MaterializedView::builder()
.with_name("total_orders")
.with_location("test/total_orders")
.with_schema(total_matview_schema)
.with_view_version(
Version::builder()
.with_representation(ViewRepresentation::sql(
"select product_id, sum(amount) from iceberg.test.orders_view group by product_id;",
None,
))
.build()
.unwrap(),
)
.build(&["test".to_owned()], catalog.clone())
.await
.expect("Failed to create materialized view");

// Datafusion

Expand Down
57 changes: 52 additions & 5 deletions datafusion_iceberg/src/catalog/mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@ use futures::{executor::LocalPool, task::LocalSpawnExt};
use std::{collections::HashSet, sync::Arc};

use iceberg_rust::{
catalog::{identifier::Identifier, namespace::Namespace, tabular::Tabular, Catalog},
catalog::{
bucket::Bucket,
create::{CreateMaterializedView, CreateView},
identifier::Identifier,
namespace::Namespace,
tabular::Tabular,
Catalog,
},
error::Error as IcebergError,
spec::table_metadata::new_metadata_location,
util::strip_prefix,
};
use iceberg_rust_spec::spec::{tabular::TabularMetadata, view_metadata::REF_PREFIX};

Expand Down Expand Up @@ -205,21 +214,59 @@ impl Mirror {
.metadata()
.to_owned();
match metadata {
TabularMetadata::Table(metadata) => {
TabularMetadata::Table(_) => {
let metadata_location = new_metadata_location(&metadata);
let object_store = cloned_catalog
.object_store(Bucket::from_path(&metadata_location).unwrap());
object_store
.put(
&strip_prefix(&metadata_location).into(),
serde_json::to_vec(&metadata).unwrap().into(),
)
.await
.unwrap();
cloned_catalog
.create_table(identifier, metadata)
.register_table(identifier, &metadata_location)
.await
.unwrap();
}
TabularMetadata::View(metadata) => {
let name = identifier.name().to_owned();
let view_version =
metadata.versions[&metadata.current_version_id].clone();
cloned_catalog
.create_view(identifier, metadata)
.create_view(
identifier,
CreateView {
name,
location: Some(metadata.location),
schema: metadata.schemas[&view_version.schema_id].clone(),
view_version,
properties: metadata.properties,
},
)
.await
.unwrap();
}
TabularMetadata::MaterializedView(metadata) => {
let name = identifier.name().to_owned();
let view_version =
metadata.versions[&metadata.current_version_id].clone();
cloned_catalog
.create_materialized_view(identifier, metadata)
.create_materialized_view(
identifier,
CreateMaterializedView {
name,
location: Some(metadata.location),
schema: metadata.schemas[&view_version.schema_id].clone(),
view_version,
properties: metadata.properties,
partition_spec: None,
write_order: None,
stage_create: None,
table_properties: None,
},
)
.await
.unwrap();
}
Expand Down
Loading

0 comments on commit 89502fd

Please sign in to comment.