-
Notifications
You must be signed in to change notification settings - Fork 430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performing multiple optimize operations after creating a checkpoint can lead to data errors and exponential data bloat. #3047
Comments
I first encountered this issue using Rust 0.22.2. Then, I reproduced the issue using Python. use clap::{Command, Parser, Subcommand};
use deltalake::arrow::array::RecordBatch;
use deltalake::arrow::datatypes::DataType;
use deltalake::arrow::{self, array};
use deltalake::checkpoints::create_checkpoint;
use deltalake::operations::optimize::OptimizeBuilder;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::{open_table, DeltaOps};
use std::sync::Arc;
use std::time::Duration;
const TARGET_SIZE: i64 = 1024 * 128;
const MIN_COMMIT_INTERVAL: Duration = Duration::from_secs(10);
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
AddData {
#[arg(short, long, default_value_t = 0)]
offset: i32,
#[arg(short, long, default_value_t = 1000)]
limit: usize,
},
Optimize,
Vacuum,
Checkpoint,
}
#[tokio::main]
async fn main() {
let cli = Cli::parse();
let path = "./data/delta";
match &cli.command {
Commands::AddData { offset, limit } => {
add_data(path, *offset, *limit).await;
}
Commands::Optimize => {
let delta_table = open_table(path).await.unwrap();
optimize(delta_table.clone(), TARGET_SIZE, MIN_COMMIT_INTERVAL).await;
}
Commands::Vacuum => {
let delta_table = open_table(path).await.unwrap();
vacuum(delta_table.clone()).await;
}
Commands::Checkpoint => {
let delta_table = open_table(path).await.unwrap();
create_checkpoint(&delta_table).await.unwrap();
}
}
}
async fn vacuum(delta_table: deltalake::DeltaTable) {
let snapshot = delta_table.snapshot().unwrap();
match VacuumBuilder::new(delta_table.log_store(), snapshot.clone())
.with_retention_period(chrono::Duration::zero())
.with_enforce_retention_duration(false)
.await
{
Ok((_, metrics)) => println!("vacuum metrics: {:?}", metrics),
Err(e) => println!("vacuum error: {:?}", e),
}
}
async fn optimize(
delta_table: deltalake::DeltaTable,
target_size: i64,
min_commit_interval: Duration,
) {
let snapshot = delta_table.snapshot().unwrap();
match OptimizeBuilder::new(delta_table.log_store(), snapshot.clone())
.with_target_size(target_size)
//.with_min_commit_interval(min_commit_interval)
.await
{
Ok((_delta_table, metrics)) => {
println!(
"{} optimize metrics: {:?}",
delta_table.table_uri(),
metrics
);
}
Err(e) => {
println!("Failed to optimize {}: {:?}", delta_table.table_uri(), e);
}
}
}
async fn add_data(table_uri: &str, offset: i32, limit: usize) {
//let table_uri = delta_table.table_uri();
let batch = gen_data(offset, limit);
let ops = DeltaOps::try_from_uri(table_uri).await.unwrap();
let table = ops.write(batch).await.unwrap();
}
fn gen_data(start_id: i32, num_records: usize) -> Vec<RecordBatch> {
let id_field = arrow::datatypes::Field::new("id", DataType::Int32, false);
let value_field = arrow::datatypes::Field::new("value", DataType::Utf8, false);
let schema = Arc::new(arrow::datatypes::Schema::new(vec![id_field, value_field]));
let mut batches = vec![];
for i in 0..num_records {
let ids = arrow::array::Int32Array::from(vec![i as i32 + start_id]);
let values = array::StringArray::from(vec![format!("value-test-{:?}-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", i)]);
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(values)]).unwrap();
batches.push(batch);
}
batches
} [package]
name = "delta_demo"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.38"
clap = {version = "4.5.21", features = ["derive"]}
deltalake = { version = "0.22.0", features = ["datafusion"] }
tokio = {version = "1.41.1", features = ["full"]} ./target/debug/delta_demo add-data --limit 1000 --offset 0 |
probably related to: #3030 |
I agree with that assessment @echai58 , can you try again with 0.22.3 @Curricane ? |
I tried updating deltalake to version v0.22.3, still using the previous code. When I insert data multiple times and optimize multiple times, leading to the creation of multiple .zstd files, I manually create a checkpoint, and then optimize multiple times again, there are still issues. I am using the Python deltalake 0.22.0 SDK to read the data. |
Python v0.22.0 is yanked. Don't use that. Can you try with 0.22.3 python? |
Ok,I tried with 0.22.3 python. there are still issues |
@Curricane can you provide a simple python MRE then? |
I have already provided my Rust code, and after compiling, I execute multiple commands. I'm just using Python to read data and verify if the data in the table has changed. Below is my Python code: import pandas as pd
from deltalake import DeltaTable, write_deltalake
path="./data/delta"
dt=DeltaTable(path)
dt.to_pandas() |
@Curricane you did. But I need a python MRE so that I can look into it for you, since I have very limited time |
Ok. # coding: utf-8
import pandas as pd
from deltalake import DeltaTable, write_deltalake
import random
import string
path_1="./data/delta_1"
def gen_df(start, end):
ids = list(range(start, end))
values = [
f"value-{x}-" + ''.join(random.choices(string.ascii_lowercase + string.digits, k=72))
for x in ids
]
df = pd.DataFrame({"id": ids, "value": values})
return df
path_1="./data/delta_1/"
for i in range(0, 10000, 1000):
df = gen_df(i, i+1000)
write_deltalake(path_1, df, mode="append")
dt=DeltaTable(path_1)
print(dt.to_pandas().count())
dt.optimize.compact(target_size=1024*256)
dt=DeltaTable(path_1)
dt.create_checkpoint()
dt=DeltaTable(path_1)
print(dt.to_pandas().count())
dt.optimize.compact(target_size=1024*256)
dt=DeltaTable(path_1)
print(dt.to_pandas().count())
dt.optimize.compact(target_size=1024*256)
dt=DeltaTable(path_1)
print(dt.to_pandas().count())
dt.vacuum()
for i in range(10000, 20000, 1000):
df = gen_df(i, i+1000)
write_deltalake(path_1, df, mode="append")
dt=DeltaTable(path_1)
print(dt.to_pandas().count())
dt.optimize.compact(target_size=1024*256)
dt=DeltaTable(path_1)
print(dt.to_pandas().count())
dt.optimize.compact(target_size=1024*256)
dt=DeltaTable(path_1)
print(dt.to_pandas().count())
dt=DeltaTable(path_1)
dt.vacuum()
dt.vacuum(retention_hours=0)
dt.vacuum(retention_hours=1)
dt.optimize.compact(target_size=1024*256)
dt.optimize.compact(target_size=1024*256)
dt.optimize.compact(target_size=1024*256)
dt.optimize.compact(target_size=1024*256)
dt.optimize.compact(target_size=1024*256)
dt.optimize.compact(target_size=1024*256)
dt=DeltaTable(path_1)
dt.optimize.compact(target_size=1024*256)
dt=DeltaTable(path_1)
dt.create_checkpoint()
dt.optimize.compact(target_size=1024*256)
dt.optimize.compact(target_size=1024*256)
print(dt.to_pandas().count()) When I tried using my Rust code again, I found that there were still issues. After multiple optimizations, the count increased significantly. This might be a problem with Rust 0.22.3. |
This is my shell log ⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 0 14:58:59
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 10000 14:59:10
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 0000 14:59:17
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 1000 14:59:22
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 2000 14:59:24
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 3000 14:59:27
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 4000 14:59:30
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 5000 14:59:32
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 6000 14:59:35
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 7000 14:59:38
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo add-data --limit 1000 --offset 8000 14:59:41
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ 14:59:43
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo checkpoint 14:59:45
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo optimize 14:59:58
/home/curricane/demo/delta_demo/data/delta_1/ optimize metrics: Metrics { num_files_added: 4, num_files_removed: 11, files_added: MetricDetails { avg: 12928.25
, max: 15003, min: 10808, total_files: 4, total_size: 51713 }, files_removed: MetricDetails { avg: 15829.818181818182, max: 15830, min: 15829, total_files: 1
1, total_size: 174128 }, partitions_optimized: 1, num_batches: 11, total_considered_files: 11, total_files_skipped: 0, preserve_insertion_order: true }
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo optimize 15:00:34
/home/curricane/demo/delta_demo/data/delta_1/ optimize metrics: Metrics { num_files_added: 8, num_files_removed: 15, files_added: MetricDetails { avg: 13438.75
, max: 15047, min: 6729, total_files: 8, total_size: 107510 }, files_removed: MetricDetails { avg: 15056.066666666668, max: 15830, min: 10808, total_files: 1
5, total_size: 225841 }, partitions_optimized: 1, num_batches: 22, total_considered_files: 15, total_files_skipped: 0, preserve_insertion_order: true }
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo optimize 15:00:39
/home/curricane/demo/delta_demo/data/delta_1/ optimize metrics: Metrics { num_files_added: 11, num_files_removed: 19, files_added: MetricDetails { avg: 13905.0
, max: 15164, min: 10903, total_files: 11, total_size: 152955 }, files_removed: MetricDetails { avg: 14823.052631578947, max: 15830, min: 6729, total_files:
19, total_size: 281638 }, partitions_optimized: 1, num_batches: 33, total_considered_files: 19, total_files_skipped: 0, preserve_insertion_order: true }
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo optimize 15:00:40
/home/curricane/demo/delta_demo/data/delta_1/ optimize metrics: Metrics { num_files_added: 15, num_files_removed: 22, files_added: MetricDetails { avg: 13676.6
66666666666, max: 15281, min: 10863, total_files: 15, total_size: 205150 }, files_removed: MetricDetails { avg: 14867.40909090909, max: 15830, min: 10903, to
tal_files: 22, total_size: 327083 }, partitions_optimized: 1, num_batches: 44, total_considered_files: 22, total_files_skipped: 0, preserve_insertion_order:
true }
⋊> curricane@k8snode02 ⋊> ~/d/delta_demo on master ⨯ ./target/debug/delta_demo optimize 15:00:41
/home/curricane/demo/delta_demo/data/delta_1/ optimize metrics: Metrics { num_files_added: 18, num_files_removed: 26, files_added: MetricDetails { avg: 13448.3
88888888889, max: 15281, min: 6854, total_files: 18, total_size: 242071 }, files_removed: MetricDetails { avg: 14587.615384615385, max: 15830, min: 10863, to
tal_files: 26, total_size: 379278 }, partitions_optimized: 1, num_batches: 55, total_considered_files: 26, total_files_skipped: 0, preserve_insertion_order:
true } You'll find that after multiple optimizations, the value of |
@Curricane could you just double check, run it against main since you are using rust |
I rm all old data by cargo clean cargo build rerun commands to add data, create checkpoint, and optimiz. the problem still here. |
Oh, I found out why upgrading The reason is that I not only needed to run In summary: Upgrading to 0.22.3 and running Perhaps you need to upgrade the packages in your project that depend on |
Environment
python3
Delta-rs version:
0.22.2
Binding:
Environment:
Bug
What happened:
i insert 4000 rows data into a delta table
create checkpoint
optimize more than once
data now is 12000 rows
What you expected to happen:
after multi optimize,the data will still be 4000 rows
How to reproduce it:
do not create checkpoint if i want optimze
More details:
The text was updated successfully, but these errors were encountered: