Skip to content

Commit

Permalink
add docs and fix failing python tests
Browse files Browse the repository at this point in the history
Signed-off-by: Nikolay Ulmasov <[email protected]>
  • Loading branch information
r3stl355 committed Dec 28, 2023
1 parent 592355e commit d2147aa
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 31 deletions.
12 changes: 4 additions & 8 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![allow(unused)]
//! Used to write [RecordBatch]es into a delta table.
//!
//! New Table Semantics
Expand Down Expand Up @@ -39,7 +38,6 @@ use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan};
use datafusion_common::DFSchema;
use datafusion_expr::Expr;
use datafusion_proto::protobuf::Constraint;
use futures::future::BoxFuture;
use futures::StreamExt;
use parquet::file::properties::WriterProperties;
Expand All @@ -54,7 +52,6 @@ use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Add, Metadata, Remove, StructType};
use crate::logstore::LogStoreRef;
use crate::operations::delete::DeleteBuilder;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
use crate::table::state::DeltaTableState;
Expand Down Expand Up @@ -329,7 +326,7 @@ async fn write_execution_plan_with_predicate(
let checker = DeltaDataChecker::new(snapshot);
let checker = match predicate {
Some(pred) => {
// TODO: get the name of the outer-most column? `*` will also work but would be slower
// TODO: get the name of the outer-most column? `*` will also work but would it be slower?
let chk = DeltaConstraint::new("*", &fmt_expr_to_sql(&pred)?);
checker.with_extra_constraints(vec![chk])
}
Expand Down Expand Up @@ -480,16 +477,15 @@ async fn prepare_predicate_actions(
let add = if candidates.partition_scan {
Vec::new()
} else {
let add = execute_non_empty_expr(
execute_non_empty_expr(
snapshot,
log_store,
&state,
state,
&predicate,
&candidates.candidates,
writer_properties,
)
.await?;
add
.await?
};
let remove = candidates.candidates;

Expand Down
4 changes: 3 additions & 1 deletion docs/_build/links.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
python:
DeltaTable: PYTHON_API_URL/delta_table
replaceWhere: https://delta-io.github.io/delta-rs/api/delta_writer/
rust:
DeltaTable: https://docs.rs/deltalake/latest/deltalake/table/struct.DeltaTable.html
DeltaTable: https://docs.rs/deltalake/latest/deltalake/table/struct.DeltaTable.html
replaceWhere: https://docs.rs/deltalake/latest/deltalake/operations/write/struct.WriteBuilder.html#method.with_replace_where
2 changes: 1 addition & 1 deletion docs/src/python/delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ def get_table_info():
dt = DeltaTable("../rust/tests/data/delta-0.2.0")
print(f"Version: {dt.version()}")
print(f"Files: {dt.files()}")
# --8<-- [end:get_table_info]
# --8<-- [end:get_table_info]
21 changes: 21 additions & 0 deletions docs/src/python/operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
def replace_where():
# --8<-- [start:replace_where]
import pyarrow as pa
from deltalake import write_deltalake

# Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite
table_path = "/tmp/my_table"
data = pa.table(
{
"id": pa.array(["1", "1"], pa.string()),
"value": pa.array([11, 12], pa.int64()),
}
)
write_deltalake(
table_path,
data,
mode="overwrite",
predicate="id = '1'",
engine="rust",
)
# --8<-- [end:replace_where]
32 changes: 32 additions & 0 deletions docs/src/rust/operations.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// --8<-- [start:replace_where]
// Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use arrow_array::RecordBatch;
import deltalake::protocol::SaveMode;

let schema = ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]);

let data = RecordBatch::try_new(
&schema,
vec![
Arc::new(arrow::array::StringArray::from(vec!["1", "1"])),
Arc::new(arrow::array::Int32Array::from(vec![11, 12])),
],
)
.unwrap();

let table = deltalake::open_table("/tmp/my_table").await.unwrap();
let table = DeltaOps(table)
.write(vec![data])
.with_save_mode(SaveMode::Overwrite)
.with_replace_where(col("id").eq(lit("1")))
.await;
// --8<-- [end:replace_where]

Ok(())
}
15 changes: 14 additions & 1 deletion docs/usage/writing-delta-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,17 @@ that partition or else the method will raise an error.
```

This method could also be used to insert a new partition if one doesn't
already exist, making this operation idempotent.
already exist, making this operation idempotent.

## Overwriting part of the table data with `replaceWhere`

When you don’t specify the `replaceWhere` (parameter named `predicate`), the overwrite save mode will replace the entire table.
Instead of replacing the entire table (which is costly!), you may want to overwrite only the specific parts of the table that should be changed.
In this case, you can use a `replaceWhere` predicate to overwrite only the relevant records or partitions.

!!! note

Data written must conform to the same predicate, i.e. not contain any records that don't conform to the `replaceWhere` predicate,
otherwise the operation will fail

{{ code_example('operations', 'replace_where', ['replaceWhere'])}}
4 changes: 3 additions & 1 deletion python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def test_read_simple_table_file_sizes_failure():
x: [-1 for item in x] if x == "size_bytes" else y
for x, y in add_actions.items()
}
dt.get_add_actions = lambda: SimpleNamespace(to_pydict=lambda: add_actions_modified) # type:ignore
dt.get_add_actions = lambda: SimpleNamespace(
to_pydict=lambda: add_actions_modified
) # type:ignore

with pytest.raises(OSError, match="Cannot seek past end of file."):
dt.to_pyarrow_dataset().to_table().to_pydict()
Expand Down
34 changes: 15 additions & 19 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,36 +890,32 @@ def test_replace_where_overwrite(
{
"p1": pa.array(["1", "1"], pa.string()),
"p2": pa.array([value_2, value_1], value_type),
"val": pa.array([2, 2], pa.int64()),
"val": pa.array([2, 3], pa.int64()),
}
)
expected_data = pa.table(
{
"p1": pa.array(["1", "1", "2", "2"], pa.string()),
"p2": pa.array([value_1, value_2, value_1, value_2], value_type),
"val": pa.array([2, 2, 1, 1], pa.int64()),
"val": pa.array([3, 2, 1, 1], pa.int64()),
}
)

with pytest.raises(
DeltaError,
match="Generic DeltaTable error: Overwriting data based on predicate is not yet implemented",
):
write_deltalake(
tmp_path,
sample_data,
mode="overwrite",
predicate="`p1` = 1",
engine="rust",
)
write_deltalake(
tmp_path,
sample_data,
mode="overwrite",
predicate="`p1` = 1",
engine="rust",
)

delta_table.update_incremental()
assert (
delta_table.to_pyarrow_table().sort_by(
[("p1", "ascending"), ("p2", "ascending")]
)
== expected_data
delta_table.update_incremental()
assert (
delta_table.to_pyarrow_table().sort_by(
[("p1", "ascending"), ("p2", "ascending")]
)
== expected_data
)


def test_partition_overwrite_with_new_partition(
Expand Down

0 comments on commit d2147aa

Please sign in to comment.