diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 0e5d29c2b4..af3541ca2c 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -21,6 +21,7 @@ //! ```` use std::collections::HashMap; +use std::fmt; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -36,7 +37,7 @@ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStream use parquet::basic::{Compression, ZstdLevel}; use parquet::errors::ParquetError; use parquet::file::properties::WriterProperties; -use serde::{Deserialize, Serialize}; +use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer}; use tracing::debug; use super::transaction::PROTOCOL; @@ -60,8 +61,16 @@ pub struct Metrics { /// Number of unoptimized files removed pub num_files_removed: u64, /// Detailed metrics for the add operation + #[serde( + serialize_with = "serialize_metric_details", + deserialize_with = "deserialize_metric_details" + )] pub files_added: MetricDetails, /// Detailed metrics for the remove operation + #[serde( + serialize_with = "serialize_metric_details", + deserialize_with = "deserialize_metric_details" + )] pub files_removed: MetricDetails, /// Number of partitions that had at least one file optimized pub partitions_optimized: u64, @@ -75,17 +84,34 @@ pub struct Metrics { pub preserve_insertion_order: bool, } +// Custom serialization function that serializes metric details as a string +fn serialize_metric_details(value: &MetricDetails, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_str(&value.to_string()) +} + +// Custom deserialization that parses a JSON string into MetricDetails +fn deserialize_metric_details<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer)?; + serde_json::from_str(&s).map_err(DeError::custom) +} + /// Statistics on files for a particular operation /// Operation can be remove or add #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct MetricDetails { - /// Minimum file size of a operation - pub min: i64, - /// Maximum file size of a operation - pub max: i64, /// Average file size of a operation pub avg: f64, + /// Maximum file size of a operation + pub max: i64, + /// Minimum file size of a operation + pub min: i64, /// Number of files encountered during operation pub total_files: usize, /// Sum of file sizes of a operation @@ -103,6 +129,13 @@ impl MetricDetails { } } +impl fmt::Display for MetricDetails { + /// Display the metric details using serde serialization + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + serde_json::to_string(self).map_err(|_| fmt::Error)?.fmt(f) + } +} + /// Metrics for a single partition pub struct PartialMetrics { /// Number of optimized files added diff --git a/python/tests/pyspark_integration/test_write_to_pyspark.py b/python/tests/pyspark_integration/test_write_to_pyspark.py index 5cf6490a62..3e4bb9d7f0 100644 --- a/python/tests/pyspark_integration/test_write_to_pyspark.py +++ b/python/tests/pyspark_integration/test_write_to_pyspark.py @@ -5,7 +5,7 @@ import pyarrow as pa import pytest -from deltalake import write_deltalake +from deltalake import DeltaTable, write_deltalake from deltalake.exceptions import DeltaProtocolError from .utils import assert_spark_read_equal, get_spark @@ -113,3 +113,59 @@ def test_checks_min_writer_version(tmp_path: pathlib.Path): ): valid_data = pa.table({"c1": pa.array([5, 6])}) write_deltalake(str(tmp_path), valid_data, mode="append") + + +@pytest.mark.pyspark +@pytest.mark.integration +def test_spark_read_optimize_history(tmp_path: pathlib.Path): + ids = ["1"] * 10 + values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] + + id_array = pa.array(ids, type=pa.string()) + value_array = pa.array(values, type=pa.int32()) + + pa_table = pa.Table.from_arrays([id_array, value_array], names=["id", "value"]) + + # Two writes on purpose for an optimize to occur + write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"]) + write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"]) + + dt = DeltaTable(tmp_path) + dt.optimize.compact(partition_filters=[("id", "=", "1")]) + + spark = get_spark() + history_df = spark.sql(f"DESCRIBE HISTORY '{tmp_path}'") + + latest_operation_metrics = ( + history_df.orderBy(history_df.version.desc()).select("operationMetrics").first() + ) + + assert latest_operation_metrics["operationMetrics"] is not None + + +@pytest.mark.pyspark +@pytest.mark.integration +def test_spark_read_z_ordered_history(tmp_path: pathlib.Path): + ids = ["1"] * 10 + values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] + + id_array = pa.array(ids, type=pa.string()) + value_array = pa.array(values, type=pa.int32()) + + pa_table = pa.Table.from_arrays([id_array, value_array], names=["id", "value"]) + + # Two writes on purpose for an optimize to occur + write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"]) + write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"]) + + dt = DeltaTable(tmp_path) + dt.optimize.z_order(columns=["value"], partition_filters=[("id", "=", "1")]) + + spark = get_spark() + history_df = spark.sql(f"DESCRIBE HISTORY '{tmp_path}'") + + latest_operation_metrics = ( + history_df.orderBy(history_df.version.desc()).select("operationMetrics").first() + ) + + assert latest_operation_metrics["operationMetrics"] is not None