Skip to content

Commit

Permalink
Merge pull request #91 from JanKaul/fixes
Browse files Browse the repository at this point in the history
Fixes
  • Loading branch information
JanKaul authored Dec 10, 2024
2 parents 9ce81e6 + c51d4c6 commit a04dc5f
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 56 deletions.
12 changes: 6 additions & 6 deletions catalogs/iceberg-file-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl Catalog for FileCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

self.cache.write().unwrap().insert(
identifier.clone(),
Expand Down Expand Up @@ -245,7 +245,7 @@ impl Catalog for FileCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

self.cache.write().unwrap().insert(
identifier.clone(),
Expand Down Expand Up @@ -291,7 +291,7 @@ impl Catalog for FileCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

object_store
.put_metadata(&table_metadata_location, table_metadata.as_ref())
Expand Down Expand Up @@ -351,7 +351,7 @@ impl Catalog for FileCatalog {
)
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

self.cache.write().unwrap().insert(
identifier.clone(),
Expand Down Expand Up @@ -401,7 +401,7 @@ impl Catalog for FileCatalog {
)
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

Ok(metadata_location)
}
Expand Down Expand Up @@ -462,7 +462,7 @@ impl Catalog for FileCatalog {
)
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

Ok(metadata_location)
}
Expand Down
12 changes: 6 additions & 6 deletions catalogs/iceberg-glue-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl Catalog for GlueCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

let schema = metadata.current_schema(None)?;

Expand Down Expand Up @@ -366,7 +366,7 @@ impl Catalog for GlueCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

let schema = metadata.current_schema(None)?;

Expand Down Expand Up @@ -440,7 +440,7 @@ impl Catalog for GlueCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

object_store
.put_metadata(&table_metadata_location, table_metadata.as_ref())
Expand Down Expand Up @@ -554,7 +554,7 @@ impl Catalog for GlueCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

let schema = metadata.current_schema(None)?;

Expand Down Expand Up @@ -655,7 +655,7 @@ impl Catalog for GlueCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

Ok(metadata_location)
}
Expand Down Expand Up @@ -763,7 +763,7 @@ impl Catalog for GlueCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

Ok(metadata_location)
}
Expand Down
17 changes: 10 additions & 7 deletions catalogs/iceberg-s3tables-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{
use async_trait::async_trait;
use aws_config::SdkConfig;

use aws_sdk_s3tables::{types::OpenTableFormat, Client};
use aws_sdk_s3tables::{
types::OpenTableFormat,
Client,
};
use iceberg_rust::{
catalog::{
commit::{
Expand Down Expand Up @@ -311,7 +314,7 @@ impl Catalog for S3TablesCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

let table = self
.client
Expand Down Expand Up @@ -372,7 +375,7 @@ impl Catalog for S3TablesCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

let table = self
.client
Expand Down Expand Up @@ -465,7 +468,7 @@ impl Catalog for S3TablesCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

object_store
.put_metadata(&table_metadata_location, table_metadata.as_ref())
Expand Down Expand Up @@ -548,7 +551,7 @@ impl Catalog for S3TablesCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

let table = self
.client
Expand Down Expand Up @@ -620,7 +623,7 @@ impl Catalog for S3TablesCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

Ok(metadata_location)
}
Expand Down Expand Up @@ -698,7 +701,7 @@ impl Catalog for S3TablesCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

Ok(metadata_location)
}
Expand Down
12 changes: 6 additions & 6 deletions catalogs/iceberg-sql-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl Catalog for SqlCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();
{
let catalog_name = self.name.clone();
let namespace = identifier.namespace().to_string();
Expand Down Expand Up @@ -358,7 +358,7 @@ impl Catalog for SqlCatalog {
.put_metadata(&metadata_location, metadata.as_ref())
.await?;

object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();
{
let catalog_name = self.name.clone();
let namespace = identifier.namespace().to_string();
Expand Down Expand Up @@ -396,7 +396,7 @@ impl Catalog for SqlCatalog {
object_store
.put_metadata(&metadata_location, metadata.as_ref())
.await?;
object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

object_store
.put_metadata(&table_metadata_location, table_metadata.as_ref())
Expand Down Expand Up @@ -460,7 +460,7 @@ impl Catalog for SqlCatalog {
object_store
.put_metadata(&metadata_location, metadata.as_ref())
.await?;
object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

let catalog_name = self.name.clone();
let namespace = identifier.namespace().to_string();
Expand Down Expand Up @@ -505,7 +505,7 @@ impl Catalog for SqlCatalog {
object_store
.put_metadata(&metadata_location, metadata.as_ref())
.await?;
object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

Ok(metadata_location)
}
Expand Down Expand Up @@ -561,7 +561,7 @@ impl Catalog for SqlCatalog {
object_store
.put_metadata(&metadata_location, metadata.as_ref())
.await?;
object_store.put_version_hint(&metadata_location).await?;
object_store.put_version_hint(&metadata_location).await.ok();

Ok(metadata_location)
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion-iceberg-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ impl IcebergContext {
.clone()
.load_tabular(
&Identifier::try_new(&[namespace.to_owned(), name.to_owned()], None)
.map_err(|err| DataFusionError::Internal(err.to_string()))?,
.map_err(|err| DataFusionError::External(Box::new(err)))?,
)
.await
.map_err(|err| DataFusionError::Internal(err.to_string()))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;

let table_source = IcebergTableSource::new(tabular, branch);

Expand Down
2 changes: 1 addition & 1 deletion datafusion-iceberg-sql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ pub async fn get_schema(
let logical_plan = planner.sql_statement_to_plan(statement)?;
let fields: Vec<FieldRef> = logical_plan.schema().fields().iter().cloned().collect();
let struct_type = StructType::try_from(&Schema::new(fields))
.map_err(|err| DataFusionError::Internal(err.to_string()))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
Ok(struct_type)
}
14 changes: 7 additions & 7 deletions datafusion_iceberg/src/catalog/mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ impl Mirror {
.clone()
.list_namespaces(None)
.await
.map_err(|err| DataFusionError::Internal(format!("{}", err)))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
for namespace in namespaces {
let mut namespace_node = HashSet::new();
let tables = catalog
.clone()
.list_tabulars(&namespace)
.await
.map_err(|err| DataFusionError::Internal(format!("{}", err)))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
for identifier in tables {
namespace_node.insert(identifier.to_string());
storage.insert(identifier.to_string(), Node::Relation(identifier));
Expand Down Expand Up @@ -81,15 +81,15 @@ impl Mirror {
.storage
.get(&namespace.to_string())
.ok_or_else(|| IcebergError::InvalidFormat("namespace in catalog".to_string()))
.map_err(|err| DataFusionError::Internal(format!("{}", err)))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
let names = if let Node::Namespace(names) = node.value() {
Ok(names)
} else {
Err(IcebergError::InvalidFormat(
"table in namespace".to_string(),
))
}
.map_err(|err| DataFusionError::Internal(format!("{}", err)))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
Ok(names
.iter()
.filter_map(|r| {
Expand Down Expand Up @@ -118,7 +118,7 @@ impl Mirror {
)
})
.collect::<Result<_, iceberg_rust::spec::error::Error>>()
.map_err(|err| DataFusionError::Internal(format!("{}", err)))
.map_err(|err| DataFusionError::External(Box::new(err)))
}
pub async fn table(
&self,
Expand Down Expand Up @@ -289,7 +289,7 @@ impl Mirror {
}
}
})
.map_err(|err| DataFusionError::Internal(format!("{}", err)))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
Ok(Some(table))
}
pub fn deregister_table(
Expand All @@ -316,7 +316,7 @@ impl Mirror {
.spawn_local(async move {
cloned_catalog.drop_table(&identifier).await.unwrap();
})
.map_err(|err| DataFusionError::Internal(format!("{}", err)))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
// Currently can't synchronously return a table which has to be fetched asynchronously
Ok(None)
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion_iceberg/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ impl SchemaProvider for IcebergSchema {
let mut full_name = self.schema.to_vec();
full_name.push(name.to_owned());
let identifier = Identifier::try_new(&full_name, None)
.map_err(|err| DataFusionError::Internal(err.to_string()))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
self.catalog.register_table(identifier, table)
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let mut full_name = self.schema.to_vec();
full_name.push(name.to_owned());
let identifier = Identifier::try_new(&full_name, None)
.map_err(|err| DataFusionError::Internal(err.to_string()))?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
self.catalog.deregister_table(identifier)
}
}
Loading

0 comments on commit a04dc5f

Please sign in to comment.