Skip to content

Commit

Permalink
Merge branch 'main' into fix-optional-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Sep 3, 2024
2 parents 3b5db70 + a6cb348 commit e8339cf
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 3 deletions.
16 changes: 16 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,22 @@ pub fn get_num_idx_cols_and_stats_columns(
)
}

/// Get the target_file_size from the table configuration in the sates
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
pub(crate) fn get_target_file_size(
config: &Option<crate::table::config::TableConfig<'_>>,
configuration: &HashMap<String, Option<String>>,
) -> i64 {
match &config {
Some(conf) => conf.target_file_size(),
_ => configuration
.get("delta.targetFileSize")
.and_then(|v| v.clone().map(|v| v.parse::<i64>().unwrap()))
.unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
}
}

#[cfg(feature = "datafusion")]
mod datafusion_utils {
use datafusion::execution::context::SessionState;
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ async fn write_execution_plan_with_predicate(
}
_ => checker,
};

// Write data to disk
let mut tasks = vec![];
for i in 0..plan.properties().output_partitioning().partition_count() {
Expand Down Expand Up @@ -977,13 +976,17 @@ impl std::future::IntoFuture for WriteBuilder {
.as_ref()
.map(|snapshot| snapshot.table_config());

let target_file_size = this.target_file_size.or_else(|| {
Some(super::get_target_file_size(&config, &this.configuration) as usize)
});
let (num_indexed_cols, stats_columns) =
super::get_num_idx_cols_and_stats_columns(config, this.configuration);

let writer_stats_config = WriterStatsConfig {
num_indexed_cols,
stats_columns,
};

// Here we need to validate if the new data conforms to a predicate if one is provided
let add_actions = write_execution_plan_with_predicate(
predicate.clone(),
Expand All @@ -992,7 +995,7 @@ impl std::future::IntoFuture for WriteBuilder {
plan.clone(),
partition_columns.clone(),
this.log_store.object_store().clone(),
this.target_file_size,
target_file_size,
this.write_batch_size,
this.writer_properties.clone(),
writer_stats_config.clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ pub struct TableConfig<'a>(pub(crate) &'a HashMap<String, Option<String>>);

/// Default num index cols
pub const DEFAULT_NUM_INDEX_COLS: i32 = 32;
/// Default target file size
pub const DEFAULT_TARGET_FILE_SIZE: i64 = 104857600;

impl<'a> TableConfig<'a> {
table_config!(
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def write_to_deltalake(
table: Optional[RawDeltaTable],
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
Expand Down
6 changes: 5 additions & 1 deletion python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def write_deltalake(
schema_mode: Optional[Literal["merge", "overwrite"]] = ...,
storage_options: Optional[Dict[str, str]] = ...,
predicate: Optional[str] = ...,
target_file_size: Optional[int] = ...,
large_dtypes: bool = ...,
engine: Literal["rust"] = ...,
writer_properties: WriterProperties = ...,
Expand Down Expand Up @@ -214,6 +215,7 @@ def write_deltalake(
storage_options: Optional[Dict[str, str]] = None,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
predicate: Optional[str] = None,
target_file_size: Optional[int] = None,
large_dtypes: bool = False,
engine: Literal["pyarrow", "rust"] = "rust",
writer_properties: Optional[WriterProperties] = None,
Expand Down Expand Up @@ -267,7 +269,8 @@ def write_deltalake(
configuration: A map containing configuration options for the metadata action.
schema_mode: If set to "overwrite", allows replacing the schema of the table. Set to "merge" to merge with existing schema.
storage_options: options passed to the native delta filesystem.
predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.
predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.'
target_file_size: Override for target file size for data files written to the delta table. If not passed, it's taken from `delta.targetFileSize`.
partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine.
large_dtypes: Only used for pyarrow engine
engine: writer engine to write the delta table. PyArrow engine is deprecated, and will be removed in v1.0.
Expand Down Expand Up @@ -308,6 +311,7 @@ def write_deltalake(
table=table._table if table is not None else None,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
Expand Down
5 changes: 5 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,7 @@ fn write_to_deltalake(
schema_mode: Option<String>,
partition_by: Option<Vec<String>>,
predicate: Option<String>,
target_file_size: Option<usize>,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
Expand Down Expand Up @@ -1650,6 +1651,10 @@ fn write_to_deltalake(
builder = builder.with_replace_where(predicate);
};

if let Some(target_file_size) = target_file_size {
builder = builder.with_target_file_size(target_file_size)
};

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};
Expand Down

0 comments on commit e8339cf

Please sign in to comment.