Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): add more commit info to most operations #2009

Merged
merged 12 commits into from
Jan 4, 2024
27 changes: 24 additions & 3 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct DeleteBuilder {
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

#[derive(Default, Debug, Serialize)]
#[derive(Default, Debug, Clone, Serialize)]
/// Metrics for the Delete Operation
pub struct DeleteMetrics {
/// Number of files added
Expand Down Expand Up @@ -249,6 +249,19 @@ async fn execute(

metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros();

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

let delete_metrics = serde_json::to_value(metrics.clone());

if let Ok(map) = delete_metrics {
app_metadata.insert("operationMetrics".to_owned(), map);
}

ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
// Do not make a commit when there are zero updates to the state
if !actions.is_empty() {
let operation = DeltaOperation::Delete {
Expand All @@ -259,7 +272,7 @@ async fn execute(
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;
}
Expand Down Expand Up @@ -390,7 +403,7 @@ mod tests {
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

let (table, metrics) = DeltaOps(table).delete().await.unwrap();
let (mut table, metrics) = DeltaOps(table).delete().await.unwrap();

assert_eq!(table.version(), 2);
assert_eq!(table.get_file_uris().count(), 0);
Expand All @@ -399,6 +412,14 @@ mod tests {
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(metrics.clone()).unwrap()
);

// rewrite is not required
assert_eq!(metrics.rewrite_time_ms, 0);

Expand Down
21 changes: 15 additions & 6 deletions crates/deltalake-core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct FileSystemCheckBuilder {
}

/// Details of the FSCK operation including which files were removed from the log
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct FileSystemCheckMetrics {
/// Was this a dry run
pub dry_run: bool,
Expand Down Expand Up @@ -154,21 +154,30 @@ impl FileSystemCheckPlan {
default_row_commit_version: file.default_row_commit_version,
}));
}
let metrics = FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
};

let mut app_metadata = HashMap::new();
let fsck_metrics = serde_json::to_value(metrics.clone());

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());
if let Ok(map) = fsck_metrics {
app_metadata.insert("operationMetrics".to_owned(), map);
}

commit(
self.log_store.as_ref(),
&actions,
DeltaOperation::FileSystemCheck {},
snapshot,
// TODO pass through metadata
None,
Some(app_metadata),
)
.await?;

Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
})
Ok(metrics)
}
}

Expand Down
22 changes: 20 additions & 2 deletions crates/deltalake-core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ impl MergeOperationConfig {
}
}

#[derive(Default, Serialize, Debug)]
#[derive(Default, Serialize, Clone, Debug)]
/// Metrics for the Merge Operation
pub struct MergeMetrics {
/// Number of rows in the source data
Expand Down Expand Up @@ -1390,6 +1390,19 @@ async fn execute(

metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64;

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

let merge_metrics = serde_json::to_value(metrics.clone());

if let Ok(map) = merge_metrics {
app_metadata.insert("operationMetrics".to_owned(), map);
}

// Do not make a commit when there are zero updates to the state
if !actions.is_empty() {
let operation = DeltaOperation::Merge {
Expand All @@ -1403,7 +1416,7 @@ async fn execute(
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;
}
Expand Down Expand Up @@ -2053,6 +2066,11 @@ mod tests {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let parameters = last_commit.operation_parameters.clone().unwrap();
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(metrics.clone()).unwrap()
);
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert_eq!(
parameters["matchedPredicates"],
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl From<RestoreError> for DeltaTableError {
}

/// Metrics from Restore
#[derive(Default, Debug, Serialize)]
#[derive(Default, Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RestoreMetrics {
/// Number of files removed
Expand Down
28 changes: 25 additions & 3 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct UpdateBuilder {
safe_cast: bool,
}

#[derive(Default, Serialize, Debug)]
#[derive(Default, Clone, Serialize, Debug)]
/// Metrics collected during the Update operation
pub struct UpdateMetrics {
/// Number of files added.
Expand Down Expand Up @@ -408,12 +408,26 @@ async fn execute(
let operation = DeltaOperation::Update {
predicate: Some(fmt_expr_to_sql(&predicate)?),
};

let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

let update_metrics = serde_json::to_value(metrics.clone());

if let Ok(map) = update_metrics {
app_metadata.insert("operationMetrics".to_owned(), map);
}

version = commit(
log_store.as_ref(),
&actions,
operation,
snapshot,
app_metadata,
Some(app_metadata),
)
.await?;

Expand Down Expand Up @@ -844,7 +858,7 @@ mod tests {

// Validate order operators do not include nulls
let table = prepare_values_table().await;
let (table, metrics) = DeltaOps(table)
let (mut table, metrics) = DeltaOps(table)
.update()
.with_predicate(col("value").gt(lit(2)).or(col("value").lt(lit(2))))
.with_update("value", lit(10))
Expand All @@ -857,6 +871,14 @@ mod tests {
assert_eq!(metrics.num_updated_rows, 2);
assert_eq!(metrics.num_copied_rows, 3);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[commit_info.len() - 1];
let extra_info = last_commit.info.clone();
assert_eq!(
extra_info["operationMetrics"],
serde_json::to_value(metrics.clone()).unwrap()
);

let expected = [
"+-------+",
"| value |",
Expand Down
Loading