Skip to content

Commit

Permalink
fix: double-encode paths during zorder optimize when they containspec…
Browse files Browse the repository at this point in the history
…ial characters

The real fix should be likely done in Datafusion, allowing
[ListingTableUrl] to be directly created rather than needing to parse
the &str passed through
  • Loading branch information
rtyler committed Sep 22, 2024
1 parent 98a64a1 commit 68f6d74
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 12 deletions.
4 changes: 1 addition & 3 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ use url::Url;
use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{
Add, DataCheck, EagerSnapshot, Invariant, LogicalFile, Snapshot, StructTypeExt,
};
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
use crate::logstore::LogStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
Expand Down
2 changes: 0 additions & 2 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3099,7 +3099,6 @@ mod tests {
use crate::kernel::Protocol;
use crate::operations::merge::Action;

let _ = pretty_env_logger::try_init();
let schema = get_delta_schema();

let actions = vec![Action::Protocol(Protocol::new(1, 4))];
Expand Down Expand Up @@ -3194,7 +3193,6 @@ mod tests {
use crate::kernel::Protocol;
use crate::operations::merge::Action;

let _ = pretty_env_logger::try_init();
let schema = get_delta_schema();

let actions = vec![Action::Protocol(Protocol::new(1, 4))];
Expand Down
120 changes: 113 additions & 7 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ use parquet::basic::{Compression, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
use tracing::debug;
use tracing::*;
use url::Url;

use super::transaction::PROTOCOL;
use super::writer::{PartitionWriter, PartitionWriterConfig};
Expand Down Expand Up @@ -137,6 +138,7 @@ impl fmt::Display for MetricDetails {
}
}

#[derive(Debug)]
/// Metrics for a single partition
pub struct PartialMetrics {
/// Number of optimized files added
Expand Down Expand Up @@ -345,6 +347,7 @@ impl From<OptimizeInput> for DeltaOperation {
}
}

/// Generate an appropriate remove action for the optimization task
fn create_remove(
path: &str,
partitions: &IndexMap<String, Scalar>,
Expand Down Expand Up @@ -606,12 +609,26 @@ impl MergePlan {
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, ScalarUDF};

let locations = files
// This code is ... not ideal. Essentially `read_parquet` expects Strings that it will then
// parse as URLs and then pass back to the object store (x_x). This can cause problems when
// paths in object storage have special characters like spaces, etc.
//
// This [str::replace] i kind of a hack to address
// <https://github.com/delta-io/delta-rs/issues/2834 >
let locations: Vec<String> = files
.iter()
.map(|file| format!("delta-rs:///{}", file.location))
.collect_vec();
.map(|om| {
format!(
"delta-rs:///{}",
str::replace(om.location.as_ref(), "%", "%25")
)
})
.collect();
debug!("Reading z-order with locations are: {locations:?}");

let df = context
.ctx
// TODO: should read options have the partition columns
.read_parquet(locations, ParquetReadOptions::default())
.await?;

Expand Down Expand Up @@ -712,13 +729,15 @@ impl MergePlan {
bins.len() <= num_cpus::get(),
));

debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");
#[cfg(feature = "datafusion")]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
log_store.object_store(),
max_spill_size,
)?);
let task_parameters = self.task_parameters.clone();

let log_store = log_store.clone();
futures::stream::iter(bins)
.map(move |(_, (partition, files))| {
Expand Down Expand Up @@ -891,9 +910,7 @@ impl MergeBin {
self.size_bytes += meta.size as i64;
self.files.push(meta);
}
}

impl MergeBin {
fn iter(&self) -> impl Iterator<Item = &ObjectMeta> {
self.files.iter()
}
Expand Down Expand Up @@ -1036,6 +1053,7 @@ fn build_zorder_plan(
.or_insert_with(|| (partition_values, MergeBin::new()))
.1
.add(object_meta);
error!("partition_files inside the zorder plan: {partition_files:?}");
}

let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files);
Expand Down Expand Up @@ -1229,7 +1247,6 @@ pub(super) mod zorder {
let runtime = Arc::new(RuntimeEnv::new(config)?);
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);

use url::Url;
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
Ok(Self { columns, ctx })
Expand Down Expand Up @@ -1269,6 +1286,7 @@ pub(super) mod zorder {
fn zorder_key_datafusion(
columns: &[ColumnarValue],
) -> Result<ColumnarValue, DataFusionError> {
debug!("zorder_key_datafusion: {columns:#?}");
let length = columns
.iter()
.map(|col| match col {
Expand Down Expand Up @@ -1423,6 +1441,94 @@ pub(super) mod zorder {
.await;
assert!(res.is_ok());
}

/// Issue <https://github.com/delta-io/delta-rs/issues/2834>
#[tokio::test]
async fn test_zorder_space_in_partition_value() {
use arrow_schema::Schema as ArrowSchema;
let _ = pretty_env_logger::try_init();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("country", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec![
"Germany",
"China",
"Canada",
"Dominican Republic",
])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
//Arc::new(arrow::array::StringArray::from(vec!["Dominican Republic"])),
//Arc::new(arrow::array::Int32Array::from(vec![100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_partition_columns(vec!["country"])
.with_save_mode(crate::protocol::SaveMode::Overwrite)
.await
.unwrap();

let res = crate::DeltaOps(table)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["modified".into()]))
.await;
assert!(res.is_ok(), "Failed to optimize: {res:#?}");
}

#[tokio::test]
async fn test_zorder_space_in_partition_value_garbage() {
use arrow_schema::Schema as ArrowSchema;
let _ = pretty_env_logger::try_init();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("country", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec![
"Germany", "China", "Canada", "USA$$!",
])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_partition_columns(vec!["country"])
.with_save_mode(crate::protocol::SaveMode::Overwrite)
.await
.unwrap();

let res = crate::DeltaOps(table)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["modified".into()]))
.await;
assert!(res.is_ok(), "Failed to optimize: {res:#?}");
}
}
}

Expand Down
35 changes: 35 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import pyarrow as pa
import pytest

try:
import pandas as pd
except ModuleNotFoundError:
_has_pandas = False
else:
_has_pandas = True

from deltalake import DeltaTable, write_deltalake
from deltalake.table import CommitProperties

Expand Down Expand Up @@ -132,3 +139,31 @@ def test_optimize_schema_evolved_table(
assert dt.to_pyarrow_table().sort_by([("foo", "ascending")]) == data.sort_by(
[("foo", "ascending")]
)


@pytest.mark.pandas
def test_zorder_with_space_partition(tmp_path: pathlib.Path):
df = pd.DataFrame(
{
"user": ["James", "Anna", "Sara", "Martin"],
"country": ["United States", "Canada", "Costa Rica", "South Africa"],
"age": [34, 23, 45, 26],
}
)

write_deltalake(
table_or_uri=tmp_path,
data=df,
mode="overwrite",
partition_by=["country"],
)

test_table = DeltaTable(tmp_path)

# retrieve by partition works fine
partitioned_df = test_table.to_pandas(
partitions=[("country", "=", "United States")],
)
print(partitioned_df)

test_table.optimize.z_order(columns=["user"])

0 comments on commit 68f6d74

Please sign in to comment.