Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump kernel 0.6 and datafusion 44 #3087

Merged
merged 6 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.5.0", features = ["default-engine"] }
delta_kernel = { version = "0.6.0", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
Expand All @@ -45,16 +45,16 @@ object_store = { version = "0.11.2" }
parquet = { version = "53" }

# datafusion
datafusion = { version = "43" }
datafusion-expr = { version = "43" }
datafusion-common = { version = "43" }
datafusion-ffi = { version = "43" }
datafusion-functions = { version = "43" }
datafusion-functions-aggregate = { version = "43" }
datafusion-physical-expr = { version = "43" }
datafusion-physical-plan = { version = "43" }
datafusion-proto = { version = "43" }
datafusion-sql = { version = "43" }
datafusion = { version = "44" }
datafusion-expr = { version = "44" }
datafusion-common = { version = "44" }
datafusion-ffi = { version = "44" }
datafusion-functions = { version = "44" }
datafusion-functions-aggregate = { version = "44" }
datafusion-physical-expr = { version = "44" }
datafusion-physical-plan = { version = "44" }
datafusion-proto = { version = "44" }
datafusion-sql = { version = "44" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand All @@ -69,10 +69,10 @@ thiserror = { version = "2" }
url = { version = "2" }
urlencoding = "2.1.3"
uuid = { version = "1" }
path-tree = { version = "0.8.1"} # pin to 0.8.1 due to nightly features

# runtime / async
async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }

3 changes: 2 additions & 1 deletion crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl LogStoreFactory for S3LogStoreFactory {
store,
)?));
}
Ok(default_logstore(store, location, &options))
Ok(default_logstore(store, location, options))
}
}

Expand Down Expand Up @@ -141,6 +141,7 @@ impl std::fmt::Debug for DynamoDbLockClient {

impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
#[allow(clippy::too_many_arguments)]
pub fn try_new(
sdk_config: &SdkConfig,
lock_table_name: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ rust-version.workspace = true
[dependencies]
deltalake-core = { version = "0.23.0", path = "../core", features = [
"datafusion",
] }
]}
lazy_static = "1"

# workspace depenndecies
Expand Down
14 changes: 8 additions & 6 deletions crates/catalog-unity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
deltalake-core = { version = "0.23", path = "../core" }
deltalake-core = { version = "0.23", path = "../core", features = [
"datafusion",
]}
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] }
reqwest-retry = "0.7"
reqwest-middleware = "0.4.0"
rand = "0.8"
futures = "0.3"
chrono = "0.4"
futures = { workspace = true }
chrono = { workspace = true }
dashmap = "6"
tracing = "0.1"
datafusion = { version = "43", optional = true }
datafusion-common = { version = "43", optional = true }
tracing = { workspace = true }
datafusion = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
Expand Down
12 changes: 4 additions & 8 deletions crates/catalog-unity/src/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ impl TokenCredential for AzureCliCredential {
"got unexpected token type from azure cli: {0}",
token_response.token_type
),
}
.into());
});
}
let duration =
token_response.expires_on.naive_local() - chrono::Local::now().naive_local();
Expand All @@ -224,18 +223,15 @@ impl TokenCredential for AzureCliCredential {
let message = String::from_utf8_lossy(&az_output.stderr);
Err(UnityCatalogError::AzureCli {
message: message.into(),
}
.into())
})
}
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => Err(UnityCatalogError::AzureCli {
message: "Azure Cli not installed".into(),
}
.into()),
}),
error_kind => Err(UnityCatalogError::AzureCli {
message: format!("io error: {error_kind:?}"),
}
.into()),
}),
},
}
}
Expand Down
3 changes: 0 additions & 3 deletions crates/catalog-unity/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub enum UnityCatalogError {
},

/// A generic error qualified in the message

#[error("{source}")]
Retry {
/// Error message
Expand All @@ -19,7 +18,6 @@ pub enum UnityCatalogError {
},

#[error("Request error: {source}")]

/// Error from reqwest library
RequestError {
/// The underlying reqwest_middleware::Error
Expand All @@ -35,7 +33,6 @@ pub enum UnityCatalogError {
},

/// Error caused by invalid access token value

#[error("Invalid Databricks personal access token")]
InvalidAccessToken,
}
3 changes: 1 addition & 2 deletions crates/catalog-unity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,7 @@ impl DataCatalog for UnityCatalog {
GetTableResponse::Error(err) => Err(UnityCatalogError::InvalidTable {
error_code: err.error_code,
message: err.message,
}
.into()),
}),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ tracing = { workspace = true }
rand = "0.8"
z85 = "3.0.5"
maplit = "1"
sqlparser = { version = "0.52.0" }
sqlparser = { version = "0.53.0" }

[dev-dependencies]
criterion = "0.5"
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/delta_datafusion/cdf/scan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub fn create_partition_values<F: FileAction>(
extensions: None,
range: None,
statistics: None,
metadata_size_hint: None,
};

file_groups.entry(new_part_values).or_default().push(part);
Expand Down
9 changes: 2 additions & 7 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ impl ScalarUDFImpl for MakeParquetArray {
r_type
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
let mut data_type = DataType::Null;
for arg in args {
data_type = arg.data_type();
}

#[allow(deprecated)]
match self.actual.invoke(args)? {
match self.actual.invoke_batch(args, number_rows)? {
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
let field = Arc::new(Field::new("element", data_type, true));
let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new(
Expand All @@ -127,10 +126,6 @@ impl ScalarUDFImpl for MakeParquetArray {
}
}

fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
self.actual.invoke_batch(&[], number_rows)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,7 @@ fn partitioned_file_from_action(
range: None,
extensions: None,
statistics: None,
metadata_size_hint: None,
}
}

Expand Down Expand Up @@ -1959,6 +1960,7 @@ mod tests {
range: None,
extensions: None,
statistics: None,
metadata_size_hint: None,
};
assert_eq!(file.partition_values, ref_file.partition_values)
}
Expand Down
10 changes: 4 additions & 6 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,10 +1215,7 @@ pub(super) mod zorder {
use url::Url;

use ::datafusion::{
execution::{
memory_pool::FairSpillPool,
runtime_env::{RuntimeConfig, RuntimeEnv},
},
execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder},
prelude::{SessionConfig, SessionContext},
};
use arrow_schema::DataType;
Expand All @@ -1245,8 +1242,9 @@ pub(super) mod zorder {
let columns = columns.into();

let memory_pool = FairSpillPool::new(max_spill_size);
let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool));
let runtime = Arc::new(RuntimeEnv::try_new(config)?);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(memory_pool))
.build_arc()?;
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
Expand Down
6 changes: 4 additions & 2 deletions crates/core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use arrow_array::{
use arrow_cast::cast;
use arrow_cast::parse::Parser;
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use delta_kernel::table_features::ColumnMappingMode;
use delta_kernel::table_features::{validate_schema_column_mapping, ColumnMappingMode};
use itertools::Itertools;

use super::state::DeltaTableState;
Expand Down Expand Up @@ -171,6 +171,8 @@ impl DeltaTableState {
})
.collect::<HashMap<&str, _>>();

validate_schema_column_mapping(self.schema(), column_mapping_mode)?;

let physical_name_to_logical_name = match column_mapping_mode {
ColumnMappingMode::None => HashMap::with_capacity(0), // No column mapping, no need for this HashMap
ColumnMappingMode::Id | ColumnMappingMode::Name => metadata
Expand All @@ -184,7 +186,7 @@ impl DeltaTableState {
"Invalid partition column {0}",
name
)))?
.physical_name(column_mapping_mode)?
.physical_name()
.to_string();
Ok((physical_name, name.as_str()))
})
Expand Down
12 changes: 8 additions & 4 deletions crates/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use datafusion_sql::parser::{DFParser, Statement as DFStatement};
use datafusion_sql::sqlparser::ast::{ObjectName, Value};
use datafusion_sql::sqlparser::dialect::{keywords::Keyword, Dialect, GenericDialect};
use datafusion_sql::sqlparser::parser::{Parser, ParserError};
use datafusion_sql::sqlparser::tokenizer::{Token, TokenWithLocation, Tokenizer};
use datafusion_sql::sqlparser::tokenizer::{Token, TokenWithSpan, Tokenizer};

// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
Expand Down Expand Up @@ -129,7 +129,7 @@ impl<'a> DeltaParser<'a> {
}

/// Report an unexpected token
fn expected<T>(&self, expected: &str, found: TokenWithLocation) -> Result<T, ParserError> {
fn expected<T>(&self, expected: &str, found: TokenWithSpan) -> Result<T, ParserError> {
parser_err!(format!("Expected {expected}, found: {found}"))
}

Expand Down Expand Up @@ -224,9 +224,9 @@ impl<'a> DeltaParser<'a> {

#[cfg(test)]
mod tests {
use datafusion_sql::sqlparser::ast::Ident;

use super::*;
use datafusion_sql::sqlparser::ast::Ident;
use datafusion_sql::sqlparser::tokenizer::Span;

fn expect_parse_ok(sql: &str, expected: Statement) -> Result<(), ParserError> {
let statements = DeltaParser::parse_sql(sql)?;
Expand All @@ -245,6 +245,7 @@ mod tests {
table: ObjectName(vec![Ident {
value: "data_table".to_string(),
quote_style: None,
span: Span::empty(),
}]),
retention_hours: None,
dry_run: false,
Expand All @@ -255,6 +256,7 @@ mod tests {
table: ObjectName(vec![Ident {
value: "data_table".to_string(),
quote_style: None,
span: Span::empty(),
}]),
retention_hours: Some(10),
dry_run: false,
Expand All @@ -265,6 +267,7 @@ mod tests {
table: ObjectName(vec![Ident {
value: "data_table".to_string(),
quote_style: None,
span: Span::empty(),
}]),
retention_hours: Some(10),
dry_run: true,
Expand All @@ -275,6 +278,7 @@ mod tests {
table: ObjectName(vec![Ident {
value: "data_table".to_string(),
quote_style: None,
span: Span::empty(),
}]),
retention_hours: None,
dry_run: true,
Expand Down
10 changes: 1 addition & 9 deletions crates/test/src/datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
use deltalake_core::datafusion::execution::context::SessionContext;
use deltalake_core::datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use deltalake_core::datafusion::execution::session_state::SessionStateBuilder;
use deltalake_core::datafusion::prelude::SessionConfig;
use deltalake_core::delta_datafusion::DeltaTableFactory;
use std::sync::Arc;

pub fn context_with_delta_table_factory() -> SessionContext {
let cfg = RuntimeConfig::new();
let env = RuntimeEnv::try_new(cfg).unwrap();
let ses = SessionConfig::new();
let mut state = SessionStateBuilder::new()
.with_config(ses)
.with_runtime_env(Arc::new(env))
.build();
let mut state = SessionStateBuilder::new().build();
state
.table_factories_mut()
.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
Expand Down
1 change: 1 addition & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ impl RawDeltaTable {
}

#[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))]
#[allow(clippy::too_many_arguments)]
pub fn load_cdf(
&mut self,
py: Python,
Expand Down
Loading