Skip to content

Commit

Permalink
refactor sql catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Aug 5, 2024
1 parent b248b1f commit 362f598
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion_iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ testcontainers = "0.20.0"
tokio-stream = { version = "0.1.15", features = ["io-util"] }
tempfile = "3.10.1"
reqwest = "0.11"
sqlx = { version = "0.7.4", features = ["runtime-tokio", "tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false }
72 changes: 42 additions & 30 deletions iceberg-sql-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use object_store::ObjectStore;
use sqlx::{
any::{install_default_drivers, AnyPoolOptions, AnyRow},
pool::PoolOptions,
AnyPool, Row,
AnyPool, Executor, Row,
};
use uuid::Uuid;

Expand All @@ -55,39 +55,42 @@ impl SqlCatalog {
) -> Result<Self, Error> {
install_default_drivers();

let mut options = PoolOptions::new();
let mut pool_options = PoolOptions::new();

if url.starts_with("sqlite") {
options = options.max_connections(1);
pool_options = pool_options.max_connections(1);
}

let pool = AnyPoolOptions::connect(options, &url).await?;

sqlx::query(
"create table if not exists iceberg_tables (
let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
Box::pin(async move {
connection
.execute(
"create table if not exists iceberg_tables (
catalog_name varchar(255) not null,
table_namespace varchar(255) not null,
table_name varchar(255) not null,
metadata_location varchar(255) not null,
previous_metadata_location varchar(255),
primary key (catalog_name, table_namespace, table_name)
);",
)
.execute(&pool)
.await?;

sqlx::query(
"create table if not exists iceberg_namespace_properties (
)
.await?;
connection
.execute(
"create table if not exists iceberg_namespace_properties (
catalog_name varchar(255) not null,
namespace varchar(255) not null,
property_key varchar(255),
property_value varchar(255),
primary key (catalog_name, namespace, property_key)
);",
)
.execute(&pool)
.await
.map_err(Error::from)?;
)
.await?;
Ok(())
})
})
.connect(&url)
.await?;

Ok(SqlCatalog {
name: name.to_owned(),
Expand Down Expand Up @@ -709,33 +712,42 @@ impl SqlCatalogList {
pub async fn new(url: &str, object_store: Arc<dyn ObjectStore>) -> Result<Self, Error> {
install_default_drivers();

let pool = AnyPoolOptions::connect(PoolOptions::new().max_connections(1), &url).await?;
let mut pool_options = PoolOptions::new();

if url.starts_with("sqlite") {
pool_options = pool_options.max_connections(1);
}

sqlx::query(
"create table if not exists iceberg_tables (
let pool = AnyPoolOptions::after_connect(pool_options, |connection, _| {
Box::pin(async move {
connection
.execute(
"create table if not exists iceberg_tables (
catalog_name varchar(255) not null,
table_namespace varchar(255) not null,
table_name varchar(255) not null,
metadata_location varchar(255) not null,
previous_metadata_location varchar(255),
primary key (catalog_name, table_namespace, table_name)
);",
)
.execute(&pool)
.await?;

sqlx::query(
"create table if not exists iceberg_namespace_properties (
)
.await?;
connection
.execute(
"create table if not exists iceberg_namespace_properties (
catalog_name varchar(255) not null,
namespace varchar(255) not null,
property_key varchar(255),
property_value varchar(255),
primary key (catalog_name, namespace, property_key)
);",
)
.execute(&pool)
.await
.map_err(Error::from)?;
)
.await?;
Ok(())
})
})
.connect(&url)
.await?;

Ok(SqlCatalogList { pool, object_store })
}
Expand Down

0 comments on commit 362f598

Please sign in to comment.