Skip to content

Commit

Permalink
Merge pull request #44 from JanKaul/feat/file-catalog
Browse files Browse the repository at this point in the history
Feat/file catalog
  • Loading branch information
JanKaul authored Oct 18, 2024
2 parents f89a7cd + 3123766 commit 2492d09
Show file tree
Hide file tree
Showing 9 changed files with 781 additions and 24 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"datafusion-iceberg-sql",
"iceberg-sql-catalog",
"iceberg-rest-catalog",
"iceberg-file-catalog",
]

resolver = "2"
Expand Down
4 changes: 3 additions & 1 deletion datafusion_iceberg/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl ExtensionPlanner for CreateIcebergTablePlanner {
.iter()
.enumerate()
.map(|(i, x)| {
let (column, transform) = parse_transform(&x)?;
let (column, transform) = parse_transform(x)?;
Ok::<_, Error>(PartitionField::new(
schema
.get_name(&column)
Expand Down Expand Up @@ -454,6 +454,8 @@ fn parse_transform(input: &str) -> Result<(String, Transform), Error> {
.ok_or(Error::InvalidFormat("Partition column".to_owned()))?;
let arg = args.next();
match (transform_name.as_str(), column, arg) {
("identity", column, None) => Ok((column, Transform::Identity)),
("void", column, None) => Ok((column, Transform::Void)),
("year", column, None) => Ok((column, Transform::Year)),
("month", column, None) => Ok((column, Transform::Month)),
("day", column, None) => Ok((column, Transform::Day)),
Expand Down
22 changes: 12 additions & 10 deletions datafusion_iceberg/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use datafusion::{
physical_plan::{ColumnStatistics, Statistics},
scalar::ScalarValue,
};
use futures::{future, TryFutureExt, TryStreamExt};
use iceberg_rust::spec::{
manifest::{ManifestEntry, Status},
schema::Schema,
Expand Down Expand Up @@ -33,16 +34,15 @@ pub(crate) async fn table_statistics(
table: &Table,
snapshot_range: &(Option<i64>, Option<i64>),
) -> Result<Statistics, Error> {
let schema = snapshot_range
let schema = &snapshot_range
.1
.and_then(|snapshot_id| table.metadata().schema(snapshot_id).ok().cloned())
.unwrap_or_else(|| table.current_schema(None).unwrap().clone());
let manifests = table.manifests(snapshot_range.0, snapshot_range.1).await?;
let datafiles = table.datafiles(&manifests, None).await?;
Ok(datafiles
.iter()
.filter(|manifest| !matches!(manifest.status(), Status::Deleted))
.fold(
datafiles
.try_filter(|manifest| future::ready(!matches!(manifest.status(), Status::Deleted)))
.try_fold(
Statistics {
num_rows: Precision::Exact(0),
total_byte_size: Precision::Exact(0),
Expand All @@ -56,9 +56,9 @@ pub(crate) async fn table_statistics(
schema.fields().len()
],
},
|acc, manifest| {
let column_stats = column_statistics(&schema, manifest);
Statistics {
|acc, manifest| async move {
let column_stats = column_statistics(schema, &manifest);
Ok(Statistics {
num_rows: acc.num_rows.add(&Precision::Exact(
*manifest.data_file().record_count() as usize,
)),
Expand All @@ -76,9 +76,11 @@ pub(crate) async fn table_statistics(
distinct_count: acc.distinct_count.add(&x.distinct_count),
})
.collect(),
}
})
},
))
)
.map_err(Error::from)
.await
}

fn column_statistics<'a>(
Expand Down
20 changes: 16 additions & 4 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ use crate::{
statistics::manifest_statistics,
};

use iceberg_rust::spec::{manifest::Status, util};
use iceberg_rust::spec::{
manifest::{ManifestEntry, Status},
util,
};
use iceberg_rust::spec::{
schema::Schema,
types::{StructField, StructType},
Expand Down Expand Up @@ -318,7 +321,7 @@ async fn table_scan(
.map_err(Into::<Error>::into)?;

// If there is a filter expression on the partition column, the manifest files to read are pruned.
let data_files = if let Some(predicate) = partition_predicates {
let data_files: Vec<ManifestEntry> = if let Some(predicate) = partition_predicates {
let physical_partition_predicate = create_physical_expr(
&predicate,
&arrow_schema.as_ref().clone().try_into()?,
Expand All @@ -340,11 +343,17 @@ async fn table_scan(
.datafiles(&manifests, Some(manifests_to_prune))
.await
.map_err(Into::<Error>::into)?
.try_collect()
.await
.map_err(Error::from)?
} else {
table
.datafiles(&manifests, None)
.await
.map_err(Into::<Error>::into)?
.try_collect()
.await
.map_err(Error::from)?
};

let pruning_predicate =
Expand Down Expand Up @@ -398,10 +407,13 @@ async fn table_scan(
.manifests(snapshot_range.0, snapshot_range.1)
.await
.map_err(Into::<Error>::into)?;
let data_files = table
let data_files: Vec<ManifestEntry> = table
.datafiles(&manifests, None)
.await
.map_err(Into::<Error>::into)?;
.map_err(Into::<Error>::into)?
.try_collect()
.await
.map_err(Error::from)?;
data_files.into_iter().for_each(|manifest| {
if *manifest.status() != Status::Deleted {
let partition_values = manifest
Expand Down
23 changes: 23 additions & 0 deletions iceberg-file-catalog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "iceberg-file-catalog"
version = "0.5.7"
edition = "2021"

description = "Filesystem catalog for the unofficial Iceberg table format implementation"

license = "Apache-2.0"

repository = "https://github.com/JanKaul/iceberg-rust"

[dependencies]
async-trait.workspace = true
futures.workspace = true
iceberg-rust = { path = "../iceberg-rust", version = "0.5.7" }
object_store.workspace = true
serde_json.workspace = true
thiserror.workspace = true
url.workspace = true
uuid = { version = "1.7.0", features = ["v4"] }

[dev-dependencies]
tokio = "1"
18 changes: 18 additions & 0 deletions iceberg-file-catalog/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use iceberg_rust::error::Error as IcebergError;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum Error {
#[error("{0}")]
Text(String),
#[error(transparent)]
ParseError(#[from] url::ParseError),
#[error(transparent)]
ParseIntError(#[from] std::num::ParseIntError),
}

impl From<Error> for IcebergError {
fn from(value: Error) -> Self {
IcebergError::InvalidFormat(value.to_string())
}
}
Loading

0 comments on commit 2492d09

Please sign in to comment.