Skip to content

Commit

Permalink
Added enable out of range to python
Browse files Browse the repository at this point in the history
  • Loading branch information
pblocz committed Dec 1, 2024
1 parent b542c0b commit f4ef67f
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 2 deletions.
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class RawDeltaTable:
ending_version: Optional[int] = None,
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None,
enable_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader: ...
def transaction_versions(self) -> Dict[str, Transaction]: ...
def __datafusion_table_provider__(self) -> Any: ...
Expand Down
2 changes: 2 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,13 +689,15 @@ def load_cdf(
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None,
columns: Optional[List[str]] = None,
enable_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader:
return self._table.load_cdf(
columns=columns,
starting_version=starting_version,
ending_version=ending_version,
starting_timestamp=starting_timestamp,
ending_timestamp=ending_timestamp,
enable_out_of_range=enable_out_of_range,
)

@property
Expand Down
7 changes: 6 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ impl RawDeltaTable {
Ok(())
}

#[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None))]
#[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, enable_out_of_range = false))]
pub fn load_cdf(
&mut self,
py: Python,
Expand All @@ -684,6 +684,7 @@ impl RawDeltaTable {
starting_timestamp: Option<String>,
ending_timestamp: Option<String>,
columns: Option<Vec<String>>,
enable_out_of_range: bool,
) -> PyResult<PyArrowType<ArrowArrayStreamReader>> {
let ctx = SessionContext::new();
let mut cdf_read = CdfLoadBuilder::new(
Expand All @@ -708,6 +709,10 @@ impl RawDeltaTable {
cdf_read = cdf_read.with_ending_timestamp(ending_ts);
}

if enable_out_of_range {
cdf_read = cdf_read.with_out_of_range();
}

if let Some(columns) = columns {
cdf_read = cdf_read.with_columns(columns);
}
Expand Down
39 changes: 38 additions & 1 deletion python/tests/test_cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pyarrow.dataset as ds
import pyarrow.parquet as pq

from deltalake import DeltaTable, write_deltalake
from deltalake import DeltaTable, _internal, write_deltalake


def test_read_cdf_partitioned():
Expand Down Expand Up @@ -677,3 +677,40 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data: pa.Table):
).sort_by(sort_values).select(expected_data.column_names) == pa.concat_tables(
[first_batch, expected_data]
).sort_by(sort_values)


def test_read_cdf_version_out_of_range():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")

try:
b = dt.load_cdf(4).read_all().to_pydict()
assert False, "Should not get here"
except _internal.DeltaError as e:
assert "invalid table version" in str(e).lower()


def test_read_cdf_version_out_of_range_with_flag():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")
b = dt.load_cdf(4, enable_out_of_range=True).read_all()

assert len(b) == 0


def test_read_timestamp_cdf_out_of_range():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")
start = "2033-12-22T17:10:21.675Z"

try:
b = dt.load_cdf(starting_timestamp=start).read_all().to_pydict()
assert False, "Should not get here"
except _internal.DeltaError as e:
assert "is greater than latest commit timestamp" in str(e).lower()


def test_read_timestamp_cdf_out_of_range_with_flag():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")

start = "2033-12-22T17:10:21.675Z"
b = dt.load_cdf(starting_timestamp=start, enable_out_of_range=True).read_all()

assert len(b) == 0

0 comments on commit f4ef67f

Please sign in to comment.