Skip to content

Commit

Permalink
expose custom_metadata to pyarrow writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Dec 27, 2023
1 parent 746e817 commit 5717c02
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
2 changes: 2 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class RawDeltaTable:
partition_by: List[str],
schema: pyarrow.Schema,
partitions_filters: Optional[FilterType],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def cleanup_metadata(self) -> None: ...

Expand All @@ -149,6 +150,7 @@ def write_new_deltalake(
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def write_to_deltalake(
table_uri: str,
Expand Down
5 changes: 4 additions & 1 deletion python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def write_deltalake(
partition_filters: Optional[List[Tuple[str, str, Any]]] = ...,
large_dtypes: bool = ...,
engine: Literal["pyarrow"] = ...,
custom_metadata: Optional[Dict[str, str]] = ...,
) -> None:
...

Expand Down Expand Up @@ -238,7 +239,7 @@ def write_deltalake(
engine: writer engine to write the delta table. `Rust` engine is still experimental but you may
see up to 4x performance improvements over pyarrow.
writer_properties: Pass writer properties to the Rust parquet writer.
custom_metadata: Custom metadata to add to the commitInfo, rust writer only.
custom_metadata: Custom metadata to add to the commitInfo.
"""
table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)
if table is not None:
Expand Down Expand Up @@ -496,6 +497,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
description,
configuration,
storage_options,
custom_metadata,
)
else:
table._table.create_write_transaction(
Expand All @@ -504,6 +506,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
partition_by or [],
schema,
partition_filters,
custom_metadata,
)
table.update_incremental()
else:
Expand Down
14 changes: 13 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ impl RawDeltaTable {
partition_by: Vec<String>,
schema: PyArrowType<ArrowSchema>,
partitions_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mode = mode.parse().map_err(PythonError::from)?;

Expand Down Expand Up @@ -803,6 +804,10 @@ impl RawDeltaTable {
partition_by: Some(partition_by),
predicate: None,
};

let app_metadata =
custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect());

let store = self._table.log_store();

rt()?
Expand All @@ -811,7 +816,7 @@ impl RawDeltaTable {
&actions,
operation,
self._table.get_state(),
None,
app_metadata,
))
.map_err(PythonError::from)?;

Expand Down Expand Up @@ -1287,6 +1292,7 @@ fn write_new_deltalake(
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
Expand All @@ -1313,6 +1319,12 @@ fn write_new_deltalake(
builder = builder.with_configuration(config);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
Expand Down

0 comments on commit 5717c02

Please sign in to comment.