From 9c3311119bcf2aa64f0e0a6ab18845f0cd62afe7 Mon Sep 17 00:00:00 2001 From: Patrick Jin Date: Tue, 17 Dec 2024 18:41:23 -0800 Subject: [PATCH] rust bindings for CDF (#612) Co-authored-by: Oussama Saoudi --- .github/workflows/build-kernel-wheels.yml | 25 ++- .../.gitignore | 1 + .../Cargo.toml | 4 +- .../src/lib.rs | 155 +++++++++++++----- 4 files changed, 138 insertions(+), 47 deletions(-) create mode 100644 python/delta-kernel-rust-sharing-wrapper/.gitignore diff --git a/.github/workflows/build-kernel-wheels.yml b/.github/workflows/build-kernel-wheels.yml index fc7bf6e13..9d7cf7849 100644 --- a/.github/workflows/build-kernel-wheels.yml +++ b/.github/workflows/build-kernel-wheels.yml @@ -15,7 +15,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-latest, macos-latest, windows-latest] + os: [ubuntu-latest, ubuntu-20.04, macos-latest, windows-latest] python-version: [3.8] arch: [x86_64, arm64] include: @@ -25,6 +25,8 @@ jobs: arch: arm64 - os: ubuntu-latest arch: x86_64 + - os: ubuntu-20.04 + arch: x86_64 - os: windows-latest arch: x86_64 @@ -69,8 +71,25 @@ jobs: maturin build --release shell: bash + - name: Build wheel (x86_64 Linux Ubuntu 20.04) + if: matrix.os == 'ubuntu-20.04' + run: | + cd python/delta-kernel-rust-sharing-wrapper + maturin build --release + shell: bash + - name: Upload wheels - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: - name: wheels + name: wheel-${{ matrix.os }}-${{ matrix.arch }} path: python/delta-kernel-rust-sharing-wrapper/target/wheels/*.whl + + merge: + runs-on: ubuntu-latest + needs: build + steps: + - name: Merge Artifacts + uses: actions/upload-artifact/merge@v4 + with: + name: all-wheels + pattern: wheel-* diff --git a/python/delta-kernel-rust-sharing-wrapper/.gitignore b/python/delta-kernel-rust-sharing-wrapper/.gitignore new file mode 100644 index 000000000..03314f77b --- /dev/null +++ b/python/delta-kernel-rust-sharing-wrapper/.gitignore @@ -0,0 +1 @@ +Cargo.lock diff --git a/python/delta-kernel-rust-sharing-wrapper/Cargo.toml b/python/delta-kernel-rust-sharing-wrapper/Cargo.toml index ae48fd0fb..a19f8eb24 100644 --- a/python/delta-kernel-rust-sharing-wrapper/Cargo.toml +++ b/python/delta-kernel-rust-sharing-wrapper/Cargo.toml @@ -2,7 +2,7 @@ name = "delta-kernel-rust-sharing-wrapper" edition = "2021" license = "Apache-2.0" -version = "0.1.0" +version = "0.2.0" [lib] name = "delta_kernel_rust_sharing_wrapper" @@ -11,7 +11,7 @@ crate-type = ["cdylib"] [dependencies] arrow = { version = "53.3.0", features = ["pyarrow"] } -delta_kernel = {version = "0.5", features = ["cloud", "default", "default-engine"]} +delta_kernel = { version = "0.6.0", features = ["cloud", "default-engine"]} openssl = { version = "0.10", features = ["vendored"] } url = "2" diff --git a/python/delta-kernel-rust-sharing-wrapper/src/lib.rs b/python/delta-kernel-rust-sharing-wrapper/src/lib.rs index 94a8dd23e..cfa6c6783 100644 --- a/python/delta-kernel-rust-sharing-wrapper/src/lib.rs +++ b/python/delta-kernel-rust-sharing-wrapper/src/lib.rs @@ -1,37 +1,44 @@ use std::sync::Arc; use arrow::compute::filter_record_batch; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::error::ArrowError; use arrow::pyarrow::PyArrowType; -use delta_kernel::engine::arrow_data::ArrowEngineData; +use arrow::record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader}; + use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::scan::ScanResult; +use delta_kernel::table_changes::scan::{ + TableChangesScan as KernelTableChangesScan, + TableChangesScanBuilder as KernelTableChangesScanBuilder, +}; +use delta_kernel::Error as KernelError; +use delta_kernel::{engine::arrow_data::ArrowEngineData, schema::StructType}; +use delta_kernel::{DeltaResult, Engine}; + use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use url::Url; -use arrow::record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader}; -use delta_kernel::Engine; - use std::collections::HashMap; -struct KernelError(delta_kernel::Error); +struct PyKernelError(KernelError); -impl From for PyErr { - fn from(error: KernelError) -> Self { +impl From for PyErr { + fn from(error: PyKernelError) -> Self { PyValueError::new_err(format!("Kernel error: {}", error.0)) } } -impl From for KernelError { - fn from(delta_kernel_error: delta_kernel::Error) -> Self { +impl From for PyKernelError { + fn from(delta_kernel_error: KernelError) -> Self { Self(delta_kernel_error) } } -type DeltaPyResult = std::result::Result; +type DeltaPyResult = std::result::Result; #[pyclass] struct Table(delta_kernel::Table); @@ -40,7 +47,7 @@ struct Table(delta_kernel::Table); impl Table { #[new] fn new(location: &str) -> DeltaPyResult { - let location = Url::parse(location).map_err(delta_kernel::Error::InvalidUrl)?; + let location = Url::parse(location).map_err(KernelError::InvalidUrl)?; let table = delta_kernel::Table::new(location); Ok(Table(table)) } @@ -73,11 +80,43 @@ impl ScanBuilder { } fn build(&mut self) -> DeltaPyResult { - let scan = self.0.take().unwrap().build()?; + let scan = self + .0 + .take() + .ok_or_else(|| KernelError::generic("Can only call build() once on ScanBuilder"))? + .build()?; Ok(Scan(scan)) } } +fn try_get_schema(schema: &Arc) -> Result { + Ok(Arc::new(schema.as_ref().try_into().map_err(|e| { + KernelError::Generic(format!("Could not get result schema: {e}")) + })?)) +} + +fn try_create_record_batch_iter( + results: impl Iterator>, + result_schema: ArrowSchemaRef, +) -> RecordBatchIterator>> { + let record_batches = results.map(|res| { + let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); + let (mask, data) = scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?; + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()))? + .into(); + if let Some(mask) = mask { + let filtered_batch = filter_record_batch(&record_batch, &mask.into())?; + Ok(filtered_batch) + } else { + Ok(record_batch) + } + }); + RecordBatchIterator::new(record_batches, result_schema) +} + #[pyclass] struct Scan(delta_kernel::scan::Scan); @@ -87,50 +126,80 @@ impl Scan { &self, engine_interface: &PythonInterface, ) -> DeltaPyResult>> { - let result_schema: SchemaRef = - Arc::new(self.0.schema().as_ref().try_into().map_err(|e| { - delta_kernel::Error::Generic(format!("Could not get result schema: {e}")) - })?); - let results = self.0.execute(engine_interface.0.as_ref())?; - let record_batches: Vec<_> = results - .map(|res| { - let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); - let (mask, data) = - scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?; - let record_batch: RecordBatch = data - .into_any() - .downcast::() - .map_err(|_| { - ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()) - })? - .into(); - if let Some(mask) = mask { - let filtered_batch = filter_record_batch(&record_batch, &mask.into())?; - Ok(filtered_batch) - } else { - Ok(record_batch) - } - }) - .collect(); - let record_batch_iter = RecordBatchIterator::new(record_batches, result_schema); + let result_schema: ArrowSchemaRef = try_get_schema(self.0.schema())?; + let results = self.0.execute(engine_interface.0.clone())?; + let record_batch_iter = try_create_record_batch_iter(results, result_schema); + Ok(PyArrowType(Box::new(record_batch_iter))) + } +} + +#[pyclass] +struct TableChangesScanBuilder(Option); + +#[pymethods] +impl TableChangesScanBuilder { + #[new] + #[pyo3(signature = (table, engine_interface, start_version, end_version=None))] + fn new( + table: &Table, + engine_interface: &PythonInterface, + start_version: u64, + end_version: Option, + ) -> DeltaPyResult { + let table_changes = table + .0 + .table_changes(engine_interface.0.as_ref(), start_version, end_version)?; + Ok(TableChangesScanBuilder(Some( + table_changes.into_scan_builder(), + ))) + } + + fn build(&mut self) -> DeltaPyResult { + let scan = self + .0 + .take() + .ok_or_else(|| { + KernelError::generic("Can only call build() once on TableChangesScanBuilder") + })? + .build()?; + let schema: ArrowSchemaRef = try_get_schema(scan.schema())?; + Ok(TableChangesScan { scan, schema }) + } +} + +#[pyclass] +struct TableChangesScan { + scan: KernelTableChangesScan, + schema: ArrowSchemaRef, +} + +#[pymethods] +impl TableChangesScan { + fn execute( + &self, + engine_interface: &PythonInterface, + ) -> DeltaPyResult>> { + let result_schema = self.schema.clone(); + let results = self.scan.execute(engine_interface.0.clone())?; + let record_batch_iter = try_create_record_batch_iter(results, result_schema); Ok(PyArrowType(Box::new(record_batch_iter))) } } #[pyclass] -struct PythonInterface(Box); +struct PythonInterface(Arc); #[pymethods] impl PythonInterface { #[new] fn new(location: &str) -> DeltaPyResult { - let url = Url::parse(location).map_err(delta_kernel::Error::InvalidUrl)?; + let url = Url::parse(location).map_err(KernelError::InvalidUrl)?; let client = DefaultEngine::try_new( &url, HashMap::::new(), Arc::new(TokioBackgroundExecutor::new()), )?; - Ok(PythonInterface(Box::new(client))) + Ok(PythonInterface(Arc::new(client))) } } @@ -144,5 +213,7 @@ fn delta_kernel_rust_sharing_wrapper(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) }