Skip to content

Commit

Permalink
Implement query builder
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Ke <[email protected]>
  • Loading branch information
PeterKeDer authored and rtyler committed Nov 29, 2024
1 parent 1083c8c commit 3335a03
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 4 deletions.
7 changes: 5 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,9 +826,12 @@ impl TableProvider for DeltaTableProvider {

fn supports_filters_pushdown(
&self,
_filter: &[&Expr],
filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact])
Ok(filter
.iter()
.map(|_| TableProviderFilterPushDown::Inexact)
.collect())
}

fn statistics(&self) -> Option<Statistics> {
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from ._internal import __version__ as __version__
from ._internal import rust_core_version as rust_core_version
from .data_catalog import DataCatalog as DataCatalog
from .query import QueryBuilder as QueryBuilder
from .schema import DataType as DataType
from .schema import Field as Field
from .schema import Schema as Schema
Expand Down
5 changes: 5 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,11 @@ class DeltaFileSystemHandler:
) -> ObjectOutputStream:
"""Open an output stream for sequential writing."""

class PyQueryBuilder:
def __init__(self) -> None: ...
def register(self, table_name: str, delta_table: RawDeltaTable) -> None: ...
def execute(self, sql: str) -> List[pyarrow.RecordBatch]: ...

class DeltaDataChecker:
def __init__(self, invariants: List[Tuple[str, str]]) -> None: ...
def check_batch(self, batch: pyarrow.RecordBatch) -> None: ...
Expand Down
31 changes: 31 additions & 0 deletions python/deltalake/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

import warnings
from typing import List

import pyarrow

from deltalake._internal import PyQueryBuilder
from deltalake.table import DeltaTable
from deltalake.warnings import ExperimentalWarning


class QueryBuilder:
def __init__(self) -> None:
warnings.warn(
"QueryBuilder is experimental and subject to change",
category=ExperimentalWarning,
)
self._query_builder = PyQueryBuilder()

def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder:
"""Add a table to the query builder."""
self._query_builder.register(
table_name=table_name,
delta_table=delta_table._table,
)
return self

def execute(self, sql: str) -> List[pyarrow.RecordBatch]:
"""Execute the query and return a list of record batches."""
return self._query_builder.execute(sql)
2 changes: 2 additions & 0 deletions python/deltalake/warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ExperimentalWarning(Warning):
pass
8 changes: 8 additions & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow_schema::ArrowError;
use deltalake::datafusion::error::DataFusionError;
use deltalake::protocol::ProtocolError;
use deltalake::{errors::DeltaTableError, ObjectStoreError};
use pyo3::exceptions::{
Expand Down Expand Up @@ -79,6 +80,10 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr {
}
}

fn datafusion_to_py(err: DataFusionError) -> PyErr {
DeltaError::new_err(err.to_string())
}

#[derive(thiserror::Error, Debug)]
pub enum PythonError {
#[error("Error in delta table")]
Expand All @@ -89,6 +94,8 @@ pub enum PythonError {
Arrow(#[from] ArrowError),
#[error("Error in checkpoint")]
Protocol(#[from] ProtocolError),
#[error("Error in data fusion")]
DataFusion(#[from] DataFusionError),
}

impl From<PythonError> for pyo3::PyErr {
Expand All @@ -98,6 +105,7 @@ impl From<PythonError> for pyo3::PyErr {
PythonError::ObjectStore(err) => object_store_to_py(err),
PythonError::Arrow(err) => arrow_to_py(err),
PythonError::Protocol(err) => checkpoint_to_py(err),
PythonError::DataFusion(err) => datafusion_to_py(err),
}
}
}
13 changes: 11 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod error;
mod features;
mod filesystem;
mod merge;
mod query;
mod schema;
mod utils;

Expand All @@ -20,12 +21,18 @@ use delta_kernel::expressions::Scalar;
use delta_kernel::schema::StructField;
use deltalake::arrow::compute::concat_batches;
use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use deltalake::arrow::pyarrow::ToPyArrow;
use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator};
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::checkpoints::{cleanup_metadata, create_checkpoint};
use deltalake::datafusion::datasource::provider_as_source;
use deltalake::datafusion::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use deltalake::datafusion::physical_plan::ExecutionPlan;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::datafusion::prelude::{DataFrame, SessionContext};
use deltalake::delta_datafusion::{
DataFusionMixins, DeltaDataChecker, DeltaScanConfigBuilder, DeltaSessionConfig,
DeltaTableProvider,
};
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{
scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction,
Expand Down Expand Up @@ -69,6 +76,7 @@ use crate::error::PythonError;
use crate::features::TableFeatures;
use crate::filesystem::FsConfig;
use crate::merge::PyMergeBuilder;
use crate::query::PyQueryBuilder;
use crate::schema::{schema_to_pyobject, Field};
use crate::utils::rt;

Expand Down Expand Up @@ -2095,6 +2103,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
)?)?;
m.add_class::<RawDeltaTable>()?;
m.add_class::<PyMergeBuilder>()?;
m.add_class::<PyQueryBuilder>()?;
m.add_class::<RawDeltaTableMetaData>()?;
m.add_class::<PyDeltaDataChecker>()?;
m.add_class::<PyTransaction>()?;
Expand Down
58 changes: 58 additions & 0 deletions python/src/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use deltalake::{
arrow::pyarrow::ToPyArrow,
datafusion::prelude::SessionContext,
delta_datafusion::{DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider},
};
use pyo3::prelude::*;

use crate::{error::PythonError, utils::rt, RawDeltaTable};

#[pyclass(module = "deltalake._internal")]
pub(crate) struct PyQueryBuilder {
_ctx: SessionContext,
}

#[pymethods]
impl PyQueryBuilder {
#[new]
pub fn new() -> Self {
let config = DeltaSessionConfig::default().into();
let _ctx = SessionContext::new_with_config(config);

PyQueryBuilder { _ctx }
}

pub fn register(&self, table_name: &str, delta_table: &RawDeltaTable) -> PyResult<()> {
let snapshot = delta_table._table.snapshot().map_err(PythonError::from)?;
let log_store = delta_table._table.log_store();

let scan_config = DeltaScanConfigBuilder::default()
.build(snapshot)
.map_err(PythonError::from)?;

let provider = Arc::new(
DeltaTableProvider::try_new(snapshot.clone(), log_store, scan_config)
.map_err(PythonError::from)?,
);

self._ctx
.register_table(table_name, provider)
.map_err(PythonError::from)?;

Ok(())
}

pub fn execute(&self, py: Python, sql: &str) -> PyResult<PyObject> {
let batches = py.allow_threads(|| {
rt().block_on(async {
let df = self._ctx.sql(sql).await?;
df.collect().await
})
.map_err(PythonError::from)
})?;

batches.to_pyarrow(py)
}
}
54 changes: 54 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from deltalake._util import encode_partition_value
from deltalake.exceptions import DeltaProtocolError
from deltalake.query import QueryBuilder
from deltalake.table import ProtocolVersions
from deltalake.writer import write_deltalake

Expand Down Expand Up @@ -946,3 +947,56 @@ def test_is_deltatable_with_storage_opts():
"DELTA_DYNAMO_TABLE_NAME": "custom_table_name",
}
assert DeltaTable.is_deltatable(table_path, storage_options=storage_options)


def test_read_query_builder():
table_path = "../crates/test/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
expected = {
"value": ["4", "5", "6", "7"],
"year": ["2021", "2021", "2021", "2021"],
"month": ["4", "12", "12", "12"],
"day": ["5", "4", "20", "20"],
}
actual = pa.Table.from_batches(
QueryBuilder()
.register("tbl", dt)
.execute("SELECT * FROM tbl WHERE year >= 2021 ORDER BY value")
).to_pydict()
assert expected == actual


def test_read_query_builder_join_multiple_tables(tmp_path):
table_path = "../crates/test/tests/data/delta-0.8.0-date"
dt1 = DeltaTable(table_path)

write_deltalake(
tmp_path,
pa.table(
{
"date": ["2021-01-01", "2021-01-02", "2021-01-03", "2021-12-31"],
"value": ["a", "b", "c", "d"],
}
),
)
dt2 = DeltaTable(tmp_path)

expected = {
"date": ["2021-01-01", "2021-01-02", "2021-01-03"],
"dayOfYear": [1, 2, 3],
"value": ["a", "b", "c"],
}
actual = pa.Table.from_batches(
QueryBuilder()
.register("tbl1", dt1)
.register("tbl2", dt2)
.execute(
"""
SELECT tbl2.date, tbl1.dayOfYear, tbl2.value
FROM tbl1
INNER JOIN tbl2 ON tbl1.date = tbl2.date
ORDER BY tbl1.date
"""
)
).to_pydict()
assert expected == actual

0 comments on commit 3335a03

Please sign in to comment.