Skip to content

Commit

Permalink
fix(rust): serialize MetricDetails from compaction runs to a string (#…
Browse files Browse the repository at this point in the history
…2317)

I am by no means a Rust developer and haven't touched it in years; so
please let me know if there's a better way to go about this. The Rust
z_order and optimize.compact already serializes the metrics before it is
passed back to Python, which then deserializes it back, so the Python
behavior in terms of expecting this as a Dict has not changed which I
think is what we want.

# Description
Adds a custom serialzer and Display implementation for the
`MetricDetails` fields, namely `filesAdded` and `filesRemoved` so that
those fields are written as strings instead of a struct to the commit
log. Query engines expect these fields to be strings on reads.

I had trouble getting the pyspark tests running locally, but here is an
example optimize commit log that gets written with these changes:

```
{"commitInfo":{"timestamp":1711125995487,"operation":"OPTIMIZE","operationParameters":{"targetSize":"104857600","predicate":"[]"},"clientVersion":"delta-rs.0.17.1","readVersion":10,"operationMetrics":{"filesAdded":"{\"avg\":19956.0,\"max\":19956,\"min\":19956,\"totalFiles\":1,\"totalSize\":19956}","filesRemoved":"{\"avg\":4851.833333333333,\"max\":10358,\"min\":3734,\"totalFiles\":6,\"totalSize\":29111}","numBatches":6,"numFilesAdded":1,"numFilesRemoved":6,"partitionsOptimized":1,"preserveInsertionOrder":true,"totalConsideredFiles":6,"totalFilesSkipped":0}}}
```

# Related Issue(s)
- #2087

# Documentation

N/A
  • Loading branch information
liamphmurphy authored Mar 24, 2024
1 parent 00c919f commit 923dfef
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 6 deletions.
43 changes: 38 additions & 5 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! ````
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

Expand All @@ -36,7 +37,7 @@ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStream
use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use serde::{Deserialize, Serialize};
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
use tracing::debug;

use super::transaction::PROTOCOL;
Expand All @@ -60,8 +61,16 @@ pub struct Metrics {
/// Number of unoptimized files removed
pub num_files_removed: u64,
/// Detailed metrics for the add operation
#[serde(
serialize_with = "serialize_metric_details",
deserialize_with = "deserialize_metric_details"
)]
pub files_added: MetricDetails,
/// Detailed metrics for the remove operation
#[serde(
serialize_with = "serialize_metric_details",
deserialize_with = "deserialize_metric_details"
)]
pub files_removed: MetricDetails,
/// Number of partitions that had at least one file optimized
pub partitions_optimized: u64,
Expand All @@ -75,17 +84,34 @@ pub struct Metrics {
pub preserve_insertion_order: bool,
}

// Custom serialization function that serializes metric details as a string
fn serialize_metric_details<S>(value: &MetricDetails, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&value.to_string())
}

// Custom deserialization that parses a JSON string into MetricDetails
fn deserialize_metric_details<'de, D>(deserializer: D) -> Result<MetricDetails, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
serde_json::from_str(&s).map_err(DeError::custom)
}

/// Statistics on files for a particular operation
/// Operation can be remove or add
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MetricDetails {
/// Minimum file size of a operation
pub min: i64,
/// Maximum file size of a operation
pub max: i64,
/// Average file size of a operation
pub avg: f64,
/// Maximum file size of a operation
pub max: i64,
/// Minimum file size of a operation
pub min: i64,
/// Number of files encountered during operation
pub total_files: usize,
/// Sum of file sizes of a operation
Expand All @@ -103,6 +129,13 @@ impl MetricDetails {
}
}

impl fmt::Display for MetricDetails {
/// Display the metric details using serde serialization
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
serde_json::to_string(self).map_err(|_| fmt::Error)?.fmt(f)
}
}

/// Metrics for a single partition
pub struct PartialMetrics {
/// Number of optimized files added
Expand Down
58 changes: 57 additions & 1 deletion python/tests/pyspark_integration/test_write_to_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow as pa
import pytest

from deltalake import write_deltalake
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaProtocolError

from .utils import assert_spark_read_equal, get_spark
Expand Down Expand Up @@ -113,3 +113,59 @@ def test_checks_min_writer_version(tmp_path: pathlib.Path):
):
valid_data = pa.table({"c1": pa.array([5, 6])})
write_deltalake(str(tmp_path), valid_data, mode="append")


@pytest.mark.pyspark
@pytest.mark.integration
def test_spark_read_optimize_history(tmp_path: pathlib.Path):
ids = ["1"] * 10
values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

id_array = pa.array(ids, type=pa.string())
value_array = pa.array(values, type=pa.int32())

pa_table = pa.Table.from_arrays([id_array, value_array], names=["id", "value"])

# Two writes on purpose for an optimize to occur
write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"])
write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"])

dt = DeltaTable(tmp_path)
dt.optimize.compact(partition_filters=[("id", "=", "1")])

spark = get_spark()
history_df = spark.sql(f"DESCRIBE HISTORY '{tmp_path}'")

latest_operation_metrics = (
history_df.orderBy(history_df.version.desc()).select("operationMetrics").first()
)

assert latest_operation_metrics["operationMetrics"] is not None


@pytest.mark.pyspark
@pytest.mark.integration
def test_spark_read_z_ordered_history(tmp_path: pathlib.Path):
ids = ["1"] * 10
values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

id_array = pa.array(ids, type=pa.string())
value_array = pa.array(values, type=pa.int32())

pa_table = pa.Table.from_arrays([id_array, value_array], names=["id", "value"])

# Two writes on purpose for an optimize to occur
write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"])
write_deltalake(tmp_path, pa_table, mode="append", partition_by=["id"])

dt = DeltaTable(tmp_path)
dt.optimize.z_order(columns=["value"], partition_filters=[("id", "=", "1")])

spark = get_spark()
history_df = spark.sql(f"DESCRIBE HISTORY '{tmp_path}'")

latest_operation_metrics = (
history_df.orderBy(history_df.version.desc()).select("operationMetrics").first()
)

assert latest_operation_metrics["operationMetrics"] is not None

0 comments on commit 923dfef

Please sign in to comment.