Skip to content

Commit

Permalink
feat: exploratory work to integrate iceberg/glue
Browse files Browse the repository at this point in the history
  • Loading branch information
timvw committed Apr 6, 2024
1 parent 67f1e5c commit 3d36e04
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object_store = { version = "0.9", features = ["aws", "gcp"] }
regex = "1.10"
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
url = "2.5"
iceberg-rust = { git = "https://github.com/DataFusion-Plus/jankaul-iceberg-rust.git", rev="9ee58925a6ad680adb7560c80a5224ab34d3f7f7"}

[dev-dependencies]
assert_cmd = "2.0"
Expand Down
136 changes: 97 additions & 39 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;

Check warning on line 21 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/qv/qv/src/main.rs
use datafusion::prelude::*;
use deltalake::open_table;
use iceberg_rust::catalog::Catalog;

Check failure on line 24 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo clippy

unused import: `iceberg_rust::catalog::Catalog`

Check warning on line 24 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `iceberg_rust::catalog::Catalog`

Check warning on line 24 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `iceberg_rust::catalog::Catalog`
use iceberg_rust::catalog::tabular::{get_tabular_metadata, Tabular};

Check failure on line 25 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo clippy

unused imports: `Tabular`, `get_tabular_metadata`

Check warning on line 25 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused imports: `Tabular`, `get_tabular_metadata`

Check warning on line 25 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused imports: `Tabular`, `get_tabular_metadata`
use iceberg_rust::table::table_builder::TableBuilder;

Check failure on line 26 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo clippy

unused import: `iceberg_rust::table::table_builder::TableBuilder`

Check warning on line 26 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `iceberg_rust::table::table_builder::TableBuilder`

Check warning on line 26 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `iceberg_rust::table::table_builder::TableBuilder`
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
use object_store::path::Path;
Expand All @@ -43,7 +46,7 @@ async fn main() -> Result<()> {

Check warning on line 46 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/qv/qv/src/main.rs
let sdk_config = get_sdk_config(&args).await;

let (data_path, file_format) = replace_glue_table_with_path(&data_path, &sdk_config).await?;
let (data_path, maybe_table_type) = replace_glue_table_with_path(&data_path, &sdk_config).await?;

let data_path = if data_path.starts_with("s3://") {
// register s3 object store
Expand Down Expand Up @@ -96,21 +99,51 @@ async fn main() -> Result<()> {
data_path
};

let table: Arc<dyn TableProvider> = if let Ok(mut delta_table) = open_table(&data_path).await {
let table: Arc<dyn TableProvider> = if let Some(table_type) = maybe_table_type {
match table_type {
TableType::Deltalake => {
let mut delta_table = open_table(&data_path).await?;
if let Some(at) = args.at {
delta_table.load_with_datetime(at).await?;

Check warning on line 107 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/qv/qv/src/main.rs
}
Arc::new(delta_table)
},
TableType::Iceberg(metadata_location) => {

Check failure on line 111 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo clippy

unused variable: `metadata_location`

Check warning on line 111 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused variable: `metadata_location`

Check warning on line 111 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused variable: `metadata_location`
// need to get the relevant object store...
// in this case it's safe to assume that????
//let metadata_url = Url::parse(&metadata_location)?;
//let os = ctx.runtime_env().object_store_registry.get_store(&metadata_url)?;
//let tabular = get_tabular_metadata(&metadata_location, os).await?;

//let catalog: Arc<dyn Catalog> = Arc::new(
// SqlCatalog::new("sqlite://", "test", object_store.clone())
// .await
// .unwrap(),

Check warning on line 121 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/qv/qv/src/main.rs
//);
todo!("still need to implement this stuff...")
},
TableType::Avro(format) => {
listing_table_with_format(&ctx, &data_path, Arc::new(format)).await?
},
TableType::Csv(format) => {
listing_table_with_format(&ctx, &data_path, Arc::new(format)).await?
},
TableType::Parquet(format) => {
listing_table_with_format(&ctx, &data_path, Arc::new(format)).await?
},
TableType::Json(format) => {
listing_table_with_format(&ctx, &data_path, Arc::new(format)).await?
},
}
} else if let Ok(mut delta_table) = open_table(&data_path).await {
if let Some(at) = args.at {
delta_table.load_with_datetime(at).await?;
}
Arc::new(delta_table)
} else {
let table_path = ListingTableUrl::parse(&data_path)?;
let mut config = ListingTableConfig::new(table_path);

config = if let Some(format) = file_format {
config.with_listing_options(ListingOptions::new(format))
} else {
config.infer_options(&ctx.state()).await?
};

config = config.infer_options(&ctx.state()).await?;
config = config.infer_schema(&ctx.state()).await?;
let table = ListingTable::try_new(config)?;
Arc::new(table)
Expand All @@ -129,6 +162,15 @@ async fn main() -> Result<()> {
Ok(())

Check warning on line 162 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/qv/qv/src/main.rs
}

async fn listing_table_with_format(ctx: &SessionContext, data_path: &str, file_format: Arc<dyn FileFormat>) -> Result<Arc<dyn TableProvider>> {
let table_path = ListingTableUrl::parse(&data_path)?;

Check failure on line 166 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo clippy

the borrowed expression implements the required traits
let mut config = ListingTableConfig::new(table_path);
config = config.with_listing_options(ListingOptions::new(file_format));
config = config.infer_schema(&ctx.state()).await?;
let table = ListingTable::try_new(config)?;
Ok(Arc::new(table))
}

async fn get_sdk_config(args: &Args) -> SdkConfig {
set_aws_profile_when_needed(args);
set_aws_region_when_needed();
Expand Down Expand Up @@ -195,7 +237,7 @@ fn test_replace_s3_console_url_with_s3_path() -> Result<()> {
async fn replace_glue_table_with_path(
path: &str,
sdk_config: &SdkConfig,
) -> Result<(String, Option<Arc<dyn FileFormat>>)> {
) -> Result<(String, Option<TableType>)> {
if let Some((database, table)) = parse_glue_url(path) {
let (location, format) = get_path_and_format(sdk_config, &database, &table).await?;
Ok((location, Some(format)))
Expand Down Expand Up @@ -226,7 +268,7 @@ async fn get_path_and_format(
sdk_config: &SdkConfig,
database_name: &str,
table_name: &str,
) -> Result<(String, Arc<dyn FileFormat>)> {
) -> Result<(String, TableType)> {
let client: Client = Client::new(sdk_config);
let table = client
.get_table()
Expand All @@ -251,8 +293,8 @@ async fn get_path_and_format(
})?;

let location = lookup_storage_location(sd)?;
let format_arc = lookup_file_format(table.clone(), sd)?;
Ok((location, format_arc))
let table_type = lookup_table_type(table.clone(), sd)?;
Ok((location, table_type))
}

fn lookup_storage_location(sd: &StorageDescriptor) -> Result<String> {
Expand All @@ -262,7 +304,7 @@ fn lookup_storage_location(sd: &StorageDescriptor) -> Result<String> {
Ok(location.to_string())
}

fn lookup_file_format(table: Table, sd: &StorageDescriptor) -> Result<Arc<dyn FileFormat>> {
fn lookup_table_type(table: Table, sd: &StorageDescriptor) -> Result<TableType> {
let empty_str = String::from("");
let input_format = sd.input_format.as_ref().unwrap_or(&empty_str);
let output_format = sd.output_format.as_ref().unwrap_or(&empty_str);
Expand Down Expand Up @@ -291,35 +333,38 @@ fn lookup_file_format(table: Table, sd: &StorageDescriptor) -> Result<Arc<dyn Fi
};

let table_parameters = table.parameters.unwrap_or_default();
let _table_type = table_parameters
let table_type = table_parameters
.get("table_type")
.map(|x| x.as_str())
.map(|x| x.as_str().to_uppercase())
.unwrap_or_default();

// this can be delta...
// or ICEBERG...

/*
Table format: Apache Iceberg
Input format: -
Output format: -
Serde serialization lib:-
*/
let metadata_location = table_parameters
.get("metadata_location")
.map(|x| x.as_str())

Check warning on line 343 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/qv/qv/src/main.rs
.unwrap_or_default();

let item: (&str, &str, &str) = (input_format, output_format, serialization_library);
let format_result: Result<Arc<dyn FileFormat>> = match item {
let item: (&str, &str, &str, &str) = (&table_type, input_format, output_format, serialization_library);
let format_result: Result<TableType> = match item {
(
"ICEBERG",
_,
_,
_,
) => Ok(TableType::Iceberg(metadata_location.to_string())),
(
"DELTA",
_,
_,
_,
) => Ok(TableType::Deltalake),
(
_,
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
) => Ok(Arc::new(ParquetFormat::default())),
(
// actually this is Deltalake format...
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
) => Ok(Arc::new(ParquetFormat::default())),
) => Ok(TableType::Parquet(ParquetFormat::default())),
(
_,
"org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
Expand All @@ -340,28 +385,32 @@ fn lookup_file_format(table: Table, sd: &StorageDescriptor) -> Result<Arc<dyn Fi
.unwrap_or(&empty_str)
.eq("1");
format = format.with_has_header(has_header);
Ok(Arc::new(format))
Ok(TableType::Csv(format))
}
(
_,
"org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat",
"org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat",
"org.apache.hadoop.hive.serde2.avro.AvroSerDe",
) => Ok(Arc::new(AvroFormat)),
) => Ok(TableType::Avro(AvroFormat)),
(
_,
"org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"org.apache.hive.hcatalog.data.JsonSerDe",
) => Ok(Arc::new(JsonFormat::default())),
) => Ok(TableType::Json(JsonFormat::default())),
(
_,
"org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"org.openx.data.jsonserde.JsonSerDe",
) => Ok(Arc::new(JsonFormat::default())),
) => Ok(TableType::Json(JsonFormat::default())),
(
_,
"org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"com.amazon.ionhiveserde.IonHiveSerDe",
) => Ok(Arc::new(JsonFormat::default())),
) => Ok(TableType::Json(JsonFormat::default())),
_ => Err(DataFusionError::Execution(format!(
"No support for: {}, {}, {:?} yet.",
input_format, output_format, sd
Expand All @@ -372,6 +421,15 @@ fn lookup_file_format(table: Table, sd: &StorageDescriptor) -> Result<Arc<dyn Fi
Ok(format)
}

enum TableType {
Parquet(ParquetFormat),
Csv(CsvFormat),
Avro(AvroFormat),
Json(JsonFormat),
Deltalake,
Iceberg(String),
}

async fn build_s3(url: &Url, sdk_config: &SdkConfig) -> Result<AmazonS3> {
let cp = sdk_config.credentials_provider().unwrap();
let creds = cp
Expand Down

0 comments on commit 3d36e04

Please sign in to comment.