Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
hntd187 committed Dec 2, 2024
2 parents 167973c + ba671c9 commit 8f52fdc
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.22.1"
version = "0.22.2"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
86 changes: 82 additions & 4 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,9 +826,12 @@ impl TableProvider for DeltaTableProvider {

fn supports_filters_pushdown(
&self,
_filter: &[&Expr],
filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact])
Ok(filter
.iter()
.map(|_| TableProviderFilterPushDown::Inexact)
.collect())
}

fn statistics(&self) -> Option<Statistics> {
Expand Down Expand Up @@ -1150,11 +1153,12 @@ pub(crate) async fn execute_plan_to_batch(
Ok(concat_batches(&plan.schema(), data.iter())?)
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
/// Responsible for checking batches of data conform to table's invariants, constraints and nullability.
#[derive(Clone, Default)]
pub struct DeltaDataChecker {
constraints: Vec<Constraint>,
invariants: Vec<Invariant>,
non_nullable_columns: Vec<String>,
ctx: SessionContext,
}

Expand All @@ -1164,6 +1168,7 @@ impl DeltaDataChecker {
Self {
invariants: vec![],
constraints: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
}
Expand All @@ -1173,6 +1178,7 @@ impl DeltaDataChecker {
Self {
invariants,
constraints: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
}
Expand All @@ -1182,6 +1188,7 @@ impl DeltaDataChecker {
Self {
constraints,
invariants: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
}
Expand All @@ -1202,9 +1209,21 @@ impl DeltaDataChecker {
pub fn new(snapshot: &DeltaTableState) -> Self {
let invariants = snapshot.schema().get_invariants().unwrap_or_default();
let constraints = snapshot.table_config().get_constraints();
let non_nullable_columns = snapshot
.schema()
.fields()
.filter_map(|f| {
if !f.is_nullable() {
Some(f.name().clone())
} else {
None
}
})
.collect_vec();
Self {
invariants,
constraints,
non_nullable_columns,
ctx: DeltaSessionContext::default().into(),
}
}
Expand All @@ -1214,10 +1233,35 @@ impl DeltaDataChecker {
/// If it does not, it will return [DeltaTableError::InvalidData] with a list
/// of values that violated each invariant.
pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.check_nullability(record_batch)?;
self.enforce_checks(record_batch, &self.invariants).await?;
self.enforce_checks(record_batch, &self.constraints).await
}

/// Return true if all the nullability checks are valid
fn check_nullability(&self, record_batch: &RecordBatch) -> Result<bool, DeltaTableError> {
let mut violations = Vec::new();
for col in self.non_nullable_columns.iter() {
if let Some(arr) = record_batch.column_by_name(col) {
if arr.null_count() > 0 {
violations.push(format!(
"Non-nullable column violation for {col}, found {} null values",
arr.null_count()
));
}
} else {
violations.push(format!(
"Non-nullable column violation for {col}, not found in batch!"
));
}
}
if !violations.is_empty() {
Err(DeltaTableError::InvalidData { violations })
} else {
Ok(true)
}
}

async fn enforce_checks<C: DataCheck>(
&self,
record_batch: &RecordBatch,
Expand Down Expand Up @@ -2598,4 +2642,38 @@ mod tests {

assert_eq!(actual.len(), 0);
}

#[tokio::test]
async fn test_check_nullability() -> DeltaResult<()> {
use arrow::array::StringArray;

let data_checker = DeltaDataChecker {
non_nullable_columns: vec!["zed".to_string(), "yap".to_string()],
..Default::default()
};

let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
let nulls: Arc<dyn Array> = Arc::new(StringArray::new_null(1));
let batch = RecordBatch::try_from_iter(vec![("a", arr), ("zed", nulls)]).unwrap();

let result = data_checker.check_nullability(&batch);
assert!(
result.is_err(),
"The result should have errored! {result:?}"
);

let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter(vec![("zed", arr)]).unwrap();
let result = data_checker.check_nullability(&batch);
assert!(
result.is_err(),
"The result should have errored! {result:?}"
);

let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
let batch = RecordBatch::try_from_iter(vec![("zed", arr.clone()), ("yap", arr)]).unwrap();
let _ = data_checker.check_nullability(&batch)?;

Ok(())
}
}
4 changes: 2 additions & 2 deletions crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake"
version = "0.22.1"
version = "0.22.2"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -16,7 +16,7 @@ rust-version.workspace = true
features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"]

[dependencies]
deltalake-core = { version = "0.22.0", path = "../core" }
deltalake-core = { version = "0.22.2", path = "../core" }
deltalake-aws = { version = "0.5.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.5.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.6.0", path = "../gcp", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.22.1"
version = "0.22.2"
authors = ["Qingping Hou <[email protected]>", "Will Jones <[email protected]>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from ._internal import __version__ as __version__
from ._internal import rust_core_version as rust_core_version
from .data_catalog import DataCatalog as DataCatalog
from .query import QueryBuilder as QueryBuilder
from .schema import DataType as DataType
from .schema import Field as Field
from .schema import Schema as Schema
Expand Down
5 changes: 5 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,11 @@ class DeltaFileSystemHandler:
) -> ObjectOutputStream:
"""Open an output stream for sequential writing."""

class PyQueryBuilder:
def __init__(self) -> None: ...
def register(self, table_name: str, delta_table: RawDeltaTable) -> None: ...
def execute(self, sql: str) -> List[pyarrow.RecordBatch]: ...

class DeltaDataChecker:
def __init__(self, invariants: List[Tuple[str, str]]) -> None: ...
def check_batch(self, batch: pyarrow.RecordBatch) -> None: ...
Expand Down
64 changes: 64 additions & 0 deletions python/deltalake/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

import warnings
from typing import List

import pyarrow

from deltalake._internal import PyQueryBuilder
from deltalake.table import DeltaTable
from deltalake.warnings import ExperimentalWarning


class QueryBuilder:
"""
QueryBuilder is an experimental API which exposes Apache DataFusion SQL to Python users of the deltalake library.
This API is subject to change.
>>> qb = QueryBuilder()
"""

def __init__(self) -> None:
warnings.warn(
"QueryBuilder is experimental and subject to change",
category=ExperimentalWarning,
)
self._query_builder = PyQueryBuilder()

def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder:
"""
Add a table to the query builder instance by name. The `table_name`
will be how the referenced `DeltaTable` can be referenced in SQL
queries.
For example:
>>> tmp = getfixture('tmp_path')
>>> import pyarrow as pa
>>> from deltalake import DeltaTable, QueryBuilder
>>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())]))
>>> qb = QueryBuilder().register('test', dt)
>>> assert qb is not None
"""
self._query_builder.register(
table_name=table_name,
delta_table=delta_table._table,
)
return self

def execute(self, sql: str) -> List[pyarrow.RecordBatch]:
"""
Execute the query and return a list of record batches
For example:
>>> tmp = getfixture('tmp_path')
>>> import pyarrow as pa
>>> from deltalake import DeltaTable, QueryBuilder
>>> dt = DeltaTable.create(table_uri=tmp, schema=pa.schema([pa.field('name', pa.string())]))
>>> qb = QueryBuilder().register('test', dt)
>>> results = qb.execute('SELECT * FROM test')
>>> assert results is not None
"""
return self._query_builder.execute(sql)
2 changes: 2 additions & 0 deletions python/deltalake/warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ExperimentalWarning(Warning):
pass
8 changes: 8 additions & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow_schema::ArrowError;
use deltalake::datafusion::error::DataFusionError;
use deltalake::protocol::ProtocolError;
use deltalake::{errors::DeltaTableError, ObjectStoreError};
use pyo3::exceptions::{
Expand Down Expand Up @@ -79,6 +80,10 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr {
}
}

fn datafusion_to_py(err: DataFusionError) -> PyErr {
DeltaError::new_err(err.to_string())
}

#[derive(thiserror::Error, Debug)]
pub enum PythonError {
#[error("Error in delta table")]
Expand All @@ -89,6 +94,8 @@ pub enum PythonError {
Arrow(#[from] ArrowError),
#[error("Error in checkpoint")]
Protocol(#[from] ProtocolError),
#[error("Error in data fusion")]
DataFusion(#[from] DataFusionError),
}

impl From<PythonError> for pyo3::PyErr {
Expand All @@ -98,6 +105,7 @@ impl From<PythonError> for pyo3::PyErr {
PythonError::ObjectStore(err) => object_store_to_py(err),
PythonError::Arrow(err) => arrow_to_py(err),
PythonError::Protocol(err) => checkpoint_to_py(err),
PythonError::DataFusion(err) => datafusion_to_py(err),
}
}
}
2 changes: 1 addition & 1 deletion python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;

const DEFAULT_MAX_BUFFER_SIZE: usize = 5 * 1024 * 1024;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub(crate) struct FsConfig {
pub(crate) root_url: String,
pub(crate) options: HashMap<String, String>,
Expand Down
13 changes: 11 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod error;
mod features;
mod filesystem;
mod merge;
mod query;
mod schema;
mod utils;

Expand All @@ -20,12 +21,18 @@ use delta_kernel::expressions::Scalar;
use delta_kernel::schema::StructField;
use deltalake::arrow::compute::concat_batches;
use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use deltalake::arrow::pyarrow::ToPyArrow;
use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator};
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::checkpoints::{cleanup_metadata, create_checkpoint};
use deltalake::datafusion::datasource::provider_as_source;
use deltalake::datafusion::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use deltalake::datafusion::physical_plan::ExecutionPlan;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::datafusion::prelude::{DataFrame, SessionContext};
use deltalake::delta_datafusion::{
DataFusionMixins, DeltaDataChecker, DeltaScanConfigBuilder, DeltaSessionConfig,
DeltaTableProvider,
};
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{
scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction,
Expand Down Expand Up @@ -69,6 +76,7 @@ use crate::error::PythonError;
use crate::features::TableFeatures;
use crate::filesystem::FsConfig;
use crate::merge::PyMergeBuilder;
use crate::query::PyQueryBuilder;
use crate::schema::{schema_to_pyobject, Field};
use crate::utils::rt;

Expand Down Expand Up @@ -2095,6 +2103,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
)?)?;
m.add_class::<RawDeltaTable>()?;
m.add_class::<PyMergeBuilder>()?;
m.add_class::<PyQueryBuilder>()?;
m.add_class::<RawDeltaTableMetaData>()?;
m.add_class::<PyDeltaDataChecker>()?;
m.add_class::<PyTransaction>()?;
Expand Down
Loading

0 comments on commit 8f52fdc

Please sign in to comment.