Skip to content

Commit

Permalink
feat: adopt kernel schema types
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed May 9, 2024
1 parent 81593e9 commit 3ab0f82
Show file tree
Hide file tree
Showing 43 changed files with 425 additions and 2,653 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "524262aa74d2f664bc5cc9b9ce06d7c6274bf7f4" }

# arrow
arrow = { version = "51" }
arrow-arith = { version = "51" }
Expand Down
6 changes: 2 additions & 4 deletions crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use aws_config::SdkConfig;
use aws_sdk_dynamodb::types::BillingMode;
use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_aws::storage::S3StorageOptions;
use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient};
use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::LogStore;
use deltalake_core::operations::transaction::{CommitBuilder, PreparedCommit};
use deltalake_core::parquet::file::metadata;
use deltalake_core::operations::transaction::CommitBuilder;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::storage::commit_uri_from_version;
use deltalake_core::storage::StorageOptions;
Expand Down Expand Up @@ -392,7 +390,7 @@ async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestRe
// create delta table
let table = DeltaOps(table)
.create()
.with_columns(schema.fields().clone())
.with_columns(schema.fields().cloned())
.await?;
println!("table created: {table:?}");
Ok(table)
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ rust-version.workspace = true
features = ["datafusion", "json", "unity-experimental"]

[dependencies]
delta_kernel.workspace = true

# arrow
arrow = { workspace = true }
arrow-arith = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ mod test {

let table = DeltaOps::new_in_memory()
.create()
.with_columns(schema.fields().clone())
.with_columns(schema.fields().cloned())
.await
.unwrap();
assert_eq!(table.version(), 0);
Expand Down
8 changes: 2 additions & 6 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use arrow::record_batch::RecordBatch;
use arrow_array::types::UInt16Type;
use arrow_array::{Array, DictionaryArray, StringArray, TypedDictionaryArray};
use arrow_cast::display::array_value_to_string;

use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
Expand Down Expand Up @@ -78,15 +77,14 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;
use either::Either;
use futures::TryStreamExt;

use itertools::Itertools;
use object_store::ObjectMeta;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot};
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
Expand Down Expand Up @@ -202,13 +200,11 @@ fn _arrow_schema(snapshot: &Snapshot, wrap_partitions: bool) -> DeltaResult<Arro
let fields = meta
.schema()?
.fields()
.iter()
.filter(|f| !meta.partition_columns.contains(&f.name().to_string()))
.map(|f| f.try_into())
.chain(
meta.schema()?
.fields()
.iter()
.filter(|f| meta.partition_columns.contains(&f.name().to_string()))
.map(|f| {
let field = Field::try_from(f)?;
Expand Down Expand Up @@ -2045,7 +2041,7 @@ mod tests {

let table = crate::DeltaOps::new_in_memory()
.create()
.with_columns(get_delta_schema().fields().clone())
.with_columns(get_delta_schema().fields().cloned())
.with_partition_columns(["modified", "id"])
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub type DeltaResult<T> = Result<T, DeltaTableError>;
#[allow(missing_docs)]
#[derive(thiserror::Error, Debug)]
pub enum DeltaTableError {
#[error("Kernel error: {0}")]
KernelError(#[from] delta_kernel::error::Error),

#[error("Delta protocol violation: {source}")]
Protocol { source: ProtocolError },

Expand Down
Loading

0 comments on commit 3ab0f82

Please sign in to comment.