Skip to content

Commit

Permalink
Merge branch 'main' into docs-datafusion-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Dec 29, 2023
2 parents 9bf5bf3 + 02e26e5 commit b9b4821
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 18 deletions.
7 changes: 3 additions & 4 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ struct MergePrefArgs {

#[tokio::main]
async fn main() {
type MergeOp = fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>;
match MergePrefArgs::parse().command {
Command::Convert(Convert {
tpcds_path,
Expand All @@ -364,11 +365,9 @@ async fn main() {
.await
.unwrap();
}

Command::Bench(BenchArg { table_path, name }) => {
let (merge_op, params): (
fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
MergePerfParams,
) = match name {
let (merge_op, params): (MergeOp, MergePerfParams) = match name {
MergeBench::Upsert(params) => (merge_upsert, params),
MergeBench::Delete(params) => (merge_delete, params),
MergeBench::Insert(params) => (merge_insert, params),
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl DeltaTableState {
let files = self.files();

// Initalize statistics
let mut table_stats = match files.get(0) {
let mut table_stats = match files.first() {
Some(file) => match file.get_stats() {
Ok(Some(stats)) => {
let mut column_statistics = Vec::with_capacity(schema.fields().size());
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ mod tests {
let partition_values_parsed = add_field_map.get("partitionValues_parsed").unwrap();
if let ArrowDataType::Struct(fields) = partition_values_parsed.data_type() {
assert_eq!(1, fields.len());
let field = fields.get(0).unwrap().to_owned();
let field = fields.first().unwrap().to_owned();
assert_eq!(
Arc::new(ArrowField::new("pcol", ArrowDataType::Int32, true)),
field
Expand All @@ -708,7 +708,7 @@ mod tests {
"minValues" | "maxValues" | "nullCount" => match v.data_type() {
ArrowDataType::Struct(fields) => {
assert_eq!(1, fields.len());
let field = fields.get(0).unwrap().to_owned();
let field = fields.first().unwrap().to_owned();
let data_type = if k == "nullCount" {
ArrowDataType::Int64
} else {
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
//! };
//! ```
#![deny(warnings)]
#![deny(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]
#![allow(clippy::nonminimal_bool)]
Expand Down
17 changes: 8 additions & 9 deletions crates/deltalake-core/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ mod simple_checkpoint {
// delta table should load just fine with the checkpoint in place
let table_result = deltalake_core::open_table(table_location).await.unwrap();
let table = table_result;
let files = table.get_files();
assert_eq!(12, files.len());
let files = table.get_files_iter();
assert_eq!(12, files.count());
}

fn get_last_checkpoint_version(log_path: &Path) -> i64 {
Expand Down Expand Up @@ -138,7 +138,7 @@ mod delete_expired_delta_log_in_checkpoint {

table.update().await.unwrap(); // make table to read the checkpoint
assert_eq!(
table.get_files(),
table.get_files_iter().collect::<Vec<_>>(),
vec![
ObjectStorePath::from(a1.path.as_ref()),
ObjectStorePath::from(a2.path.as_ref())
Expand Down Expand Up @@ -186,7 +186,7 @@ mod delete_expired_delta_log_in_checkpoint {
.unwrap();
table.update().await.unwrap(); // make table to read the checkpoint
assert_eq!(
table.get_files(),
table.get_files_iter().collect::<Vec<_>>(),
vec![
ObjectStorePath::from(a1.path.as_ref()),
ObjectStorePath::from(a2.path.as_ref())
Expand Down Expand Up @@ -249,7 +249,7 @@ mod checkpoints_with_tombstones {
checkpoints::create_checkpoint(&table).await.unwrap();
table.update().await.unwrap(); // make table to read the checkpoint
assert_eq!(
table.get_files(),
table.get_files_iter().collect::<Vec<_>>(),
vec![
ObjectStorePath::from(a1.path.as_ref()),
ObjectStorePath::from(a2.path.as_ref())
Expand All @@ -258,15 +258,15 @@ mod checkpoints_with_tombstones {

let (removes1, opt1) = pseudo_optimize(&mut table, 5 * 59 * 1000).await;
assert_eq!(
table.get_files(),
table.get_files_iter().collect::<Vec<_>>(),
vec![ObjectStorePath::from(opt1.path.as_ref())]
);
assert_eq!(table.get_state().all_tombstones(), &removes1);

checkpoints::create_checkpoint(&table).await.unwrap();
table.update().await.unwrap(); // make table to read the checkpoint
assert_eq!(
table.get_files(),
table.get_files_iter().collect::<Vec<_>>(),
vec![ObjectStorePath::from(opt1.path.as_ref())]
);
assert_eq!(table.get_state().all_tombstones().len(), 0); // stale removes are deleted from the state
Expand Down Expand Up @@ -335,8 +335,7 @@ mod checkpoints_with_tombstones {

async fn pseudo_optimize(table: &mut DeltaTable, offset_millis: i64) -> (HashSet<Remove>, Add) {
let removes: HashSet<Remove> = table
.get_files()
.iter()
.get_files_iter()
.map(|p| Remove {
path: p.to_string(),
deletion_timestamp: Some(Utc::now().timestamp_millis() - offset_millis),
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ async fn test_zorder_unpartitioned() -> Result<(), Box<dyn Error>> {
assert_eq!(metrics.total_considered_files, 2);

// Check data
let files = dt.get_files();
let files = dt.get_files_iter().collect::<Vec<_>>();
assert_eq!(files.len(), 1);

let actual = read_parquet_file(&files[0], dt.object_store()).await?;
Expand Down

0 comments on commit b9b4821

Please sign in to comment.