diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index 480bc5c521..beb7bbb591 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -1,4 +1,7 @@
-* @houqp @xianwill @wjones127 @fvaleye @roeap @rtyler @mosyp
+crates/ @wjones127 @roeap @rtyler
+delta-inspect/ @wjones127 @rtyler
proofs/ @houqp
-python/ @wjones127 @fvaleye @rtyler @roeap @houqp
+python/ @wjones127 @fvaleye @roeap
tlaplus/ @houqp
+.github/ @wjones127 @rtyler
+docs/ @MrPowers
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index ba2915cdc8..80dec2eaef 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -102,7 +102,7 @@ jobs:
AWS_ACCESS_KEY_ID: deltalake
AWS_SECRET_ACCESS_KEY: weloverust
AWS_ENDPOINT_URL: http://localhost:4566
- AWS_STORAGE_ALLOW_HTTP: "1"
+ AWS_ALLOW_HTTP: "1"
AZURE_USE_EMULATOR: "1"
AZURE_STORAGE_ALLOW_HTTP: "1"
AZURITE_BLOB_STORAGE_URL: "http://localhost:10000"
@@ -164,5 +164,5 @@ jobs:
- uses: Swatinem/rust-cache@v2
- name: Run tests
- working-directory: rust
+ working-directory: crates/deltalake-core
run: cargo test --no-default-features --features=parquet2
diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml
new file mode 100644
index 0000000000..1e828c5144
--- /dev/null
+++ b/.github/workflows/docs.yml
@@ -0,0 +1,64 @@
+name: Build documentation
+
+on:
+ pull_request:
+ paths:
+ - python/**
+ - docs/**
+ - mkdocs.yml
+ - .github/workflows/docs.yml
+jobs:
+ markdown-link-check:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - uses: gaurav-nelson/github-action-markdown-link-check@v1
+ with:
+ config-file: docs/mlc-config.json
+ folder-path: docs
+
+ lint:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - uses: psf/black@stable
+ with:
+ src: docs/src/python
+
+ build:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+
+ - name: Install Rust
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: stable
+ override: true
+ components: rustfmt, clippy
+
+ - uses: Swatinem/rust-cache@v2
+
+ - name: Set up Python
+ uses: actions/setup-python@v3
+ with:
+ python-version: '3.10'
+
+ - name: Build and install deltalake
+ run: |
+ cd python
+ pip install virtualenv
+ virtualenv venv
+ source venv/bin/activate
+ make develop
+ cd ..
+
+ - name: Install dependencies
+ run: |
+ source python/venv/bin/activate
+ pip install -r docs/requirements.txt
+
+ - name: Build documentation
+ run: |
+ source python/venv/bin/activate
+ mkdocs build
\ No newline at end of file
diff --git a/.github/workflows/python_release.yml b/.github/workflows/python_release.yml
index 85af4ea95a..6793d129a0 100644
--- a/.github/workflows/python_release.yml
+++ b/.github/workflows/python_release.yml
@@ -114,8 +114,6 @@ jobs:
target: aarch64-unknown-linux-gnu
command: publish
args: --skip-existing -m python/Cargo.toml --no-sdist ${{ env.FEATURES_FLAG }}
- # for openssl build
- before-script-linux: yum install -y perl-IPC-Cmd
release-docs:
needs:
diff --git a/.gitignore b/.gitignore
index 5fe8f6cf0a..ca0576b47c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,6 @@ Cargo.lock
!/delta-inspect/Cargo.lock
!/proofs/Cargo.lock
+justfile
+site
+__pycache__
\ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6f62ca2dcd..922a49f47e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,86 @@
# Changelog
+## [rust-v0.16.0](https://github.com/delta-io/delta-rs/tree/rust-v0.16.0) (2023-09-27)
+
+[Full Changelog](https://github.com/delta-io/delta-rs/compare/rust-v0.15.0...rust-v0.16.0)
+
+**Implemented enhancements:**
+
+- Expose Optimize option min\_commit\_interval in Python [\#1640](https://github.com/delta-io/delta-rs/issues/1640)
+- Expose create\_checkpoint\_for [\#1513](https://github.com/delta-io/delta-rs/issues/1513)
+- integration tests regularly fail for HDFS [\#1428](https://github.com/delta-io/delta-rs/issues/1428)
+- Add Support for Microsoft OneLake [\#1418](https://github.com/delta-io/delta-rs/issues/1418)
+- add support for atomic rename in R2 [\#1356](https://github.com/delta-io/delta-rs/issues/1356)
+
+**Fixed bugs:**
+
+- Writing with large arrow types \(e.g. large\_utf8\), writes wrong partition encoding [\#1669](https://github.com/delta-io/delta-rs/issues/1669)
+- \[python\] Different stringification of partition values in reader and writer [\#1653](https://github.com/delta-io/delta-rs/issues/1653)
+- Unable to interface with data written from Spark Databricks [\#1651](https://github.com/delta-io/delta-rs/issues/1651)
+- `get_last_checkpoint` does some unnecessary listing [\#1643](https://github.com/delta-io/delta-rs/issues/1643)
+- `PartitionWriter`'s `buffer_len` doesn't include incomplete row groups [\#1637](https://github.com/delta-io/delta-rs/issues/1637)
+- Slack community invite link has expired [\#1636](https://github.com/delta-io/delta-rs/issues/1636)
+- delta-rs does not appear to support tables with liquid clustering [\#1626](https://github.com/delta-io/delta-rs/issues/1626)
+- Internal Parquet panic when using a Map type. [\#1619](https://github.com/delta-io/delta-rs/issues/1619)
+- partition\_by with "$" on local filesystem [\#1591](https://github.com/delta-io/delta-rs/issues/1591)
+- ProtocolChanged error when perfoming append write [\#1585](https://github.com/delta-io/delta-rs/issues/1585)
+- Unable to `cargo update` using git tag or rev on Rust 1.70 [\#1580](https://github.com/delta-io/delta-rs/issues/1580)
+- NoMetadata error when reading detlatable [\#1562](https://github.com/delta-io/delta-rs/issues/1562)
+- Cannot read delta table: `Delta protocol violation` [\#1557](https://github.com/delta-io/delta-rs/issues/1557)
+- Update the CODEOWNERS to capture the current reviewers and contributors [\#1553](https://github.com/delta-io/delta-rs/issues/1553)
+- \[Python\] Incorrect file URIs when partition values contain escape character [\#1533](https://github.com/delta-io/delta-rs/issues/1533)
+- add documentation how to Query Delta natively from datafusion [\#1485](https://github.com/delta-io/delta-rs/issues/1485)
+- Python: write\_deltalake to ADLS Gen2 issue [\#1456](https://github.com/delta-io/delta-rs/issues/1456)
+- Partition values that have been url encoded cannot be read when using deltalake [\#1446](https://github.com/delta-io/delta-rs/issues/1446)
+- Error optimizing large table [\#1419](https://github.com/delta-io/delta-rs/issues/1419)
+- Cannot read partitions with special characters \(including space\) with pyarrow \>= 11 [\#1393](https://github.com/delta-io/delta-rs/issues/1393)
+- ImportError: deltalake/\_internal.abi3.so: cannot allocate memory in static TLS block [\#1380](https://github.com/delta-io/delta-rs/issues/1380)
+- Invalid JSON in log record missing field `schemaString` for DLT tables [\#1302](https://github.com/delta-io/delta-rs/issues/1302)
+- Special characters in partition path not handled locally [\#1299](https://github.com/delta-io/delta-rs/issues/1299)
+
+**Merged pull requests:**
+
+- chore: bump rust crate version [\#1675](https://github.com/delta-io/delta-rs/pull/1675) ([rtyler](https://github.com/rtyler))
+- fix: change partitioning schema from large to normal string for pyarrow\<12 [\#1671](https://github.com/delta-io/delta-rs/pull/1671) ([ion-elgreco](https://github.com/ion-elgreco))
+- feat: allow to set large dtypes for the schema check in `write_deltalake` [\#1668](https://github.com/delta-io/delta-rs/pull/1668) ([ion-elgreco](https://github.com/ion-elgreco))
+- docs: small consistency update in guide and readme [\#1666](https://github.com/delta-io/delta-rs/pull/1666) ([ion-elgreco](https://github.com/ion-elgreco))
+- fix: exception string in writer.py [\#1665](https://github.com/delta-io/delta-rs/pull/1665) ([sebdiem](https://github.com/sebdiem))
+- chore: increment python library version [\#1664](https://github.com/delta-io/delta-rs/pull/1664) ([wjones127](https://github.com/wjones127))
+- docs: fix some typos [\#1662](https://github.com/delta-io/delta-rs/pull/1662) ([ion-elgreco](https://github.com/ion-elgreco))
+- fix: more consistent handling of partition values and file paths [\#1661](https://github.com/delta-io/delta-rs/pull/1661) ([roeap](https://github.com/roeap))
+- docs: add docstring to protocol method [\#1660](https://github.com/delta-io/delta-rs/pull/1660) ([MrPowers](https://github.com/MrPowers))
+- docs: make docs.rs build docs with all features enabled [\#1658](https://github.com/delta-io/delta-rs/pull/1658) ([simonvandel](https://github.com/simonvandel))
+- fix: enable offset listing for s3 [\#1654](https://github.com/delta-io/delta-rs/pull/1654) ([eeroel](https://github.com/eeroel))
+- chore: fix the incorrect Slack link in our readme [\#1649](https://github.com/delta-io/delta-rs/pull/1649) ([rtyler](https://github.com/rtyler))
+- fix: compensate for invalid log files created by Delta Live Tables [\#1647](https://github.com/delta-io/delta-rs/pull/1647) ([rtyler](https://github.com/rtyler))
+- chore: proposed updated CODEOWNERS to allow better review notifications [\#1646](https://github.com/delta-io/delta-rs/pull/1646) ([rtyler](https://github.com/rtyler))
+- feat: expose min\_commit\_interval to `optimize.compact` and `optimize.z_order` [\#1645](https://github.com/delta-io/delta-rs/pull/1645) ([ion-elgreco](https://github.com/ion-elgreco))
+- fix: avoid excess listing of log files [\#1644](https://github.com/delta-io/delta-rs/pull/1644) ([eeroel](https://github.com/eeroel))
+- fix: introduce support for Microsoft OneLake [\#1642](https://github.com/delta-io/delta-rs/pull/1642) ([rtyler](https://github.com/rtyler))
+- fix: explicitly require chrono 0.4.31 or greater [\#1641](https://github.com/delta-io/delta-rs/pull/1641) ([rtyler](https://github.com/rtyler))
+- fix: include in-progress row group when calculating in-memory buffer length [\#1638](https://github.com/delta-io/delta-rs/pull/1638) ([BnMcG](https://github.com/BnMcG))
+- chore: relax chrono pin to 0.4 [\#1635](https://github.com/delta-io/delta-rs/pull/1635) ([houqp](https://github.com/houqp))
+- chore: update datafusion to 31, arrow to 46 and object\_store to 0.7 [\#1634](https://github.com/delta-io/delta-rs/pull/1634) ([houqp](https://github.com/houqp))
+- docs: update Readme [\#1633](https://github.com/delta-io/delta-rs/pull/1633) ([dennyglee](https://github.com/dennyglee))
+- chore: pin the chrono dependency [\#1631](https://github.com/delta-io/delta-rs/pull/1631) ([rtyler](https://github.com/rtyler))
+- feat: pass known file sizes to filesystem in Python [\#1630](https://github.com/delta-io/delta-rs/pull/1630) ([eeroel](https://github.com/eeroel))
+- feat: implement parsing for the new `domainMetadata` actions in the commit log [\#1629](https://github.com/delta-io/delta-rs/pull/1629) ([rtyler](https://github.com/rtyler))
+- ci: fix python release [\#1624](https://github.com/delta-io/delta-rs/pull/1624) ([wjones127](https://github.com/wjones127))
+- ci: extend azure timeout [\#1622](https://github.com/delta-io/delta-rs/pull/1622) ([wjones127](https://github.com/wjones127))
+- feat: allow multiple incremental commits in optimize [\#1621](https://github.com/delta-io/delta-rs/pull/1621) ([kvap](https://github.com/kvap))
+- fix: change map nullable value to false [\#1620](https://github.com/delta-io/delta-rs/pull/1620) ([cmackenzie1](https://github.com/cmackenzie1))
+- Introduce the changelog for the last couple releases [\#1617](https://github.com/delta-io/delta-rs/pull/1617) ([rtyler](https://github.com/rtyler))
+- chore: bump python version to 0.10.2 [\#1616](https://github.com/delta-io/delta-rs/pull/1616) ([wjones127](https://github.com/wjones127))
+- perf: avoid holding GIL in DeltaFileSystemHandler [\#1615](https://github.com/delta-io/delta-rs/pull/1615) ([wjones127](https://github.com/wjones127))
+- fix: don't re-encode paths [\#1613](https://github.com/delta-io/delta-rs/pull/1613) ([wjones127](https://github.com/wjones127))
+- feat: use url parsing from object store [\#1592](https://github.com/delta-io/delta-rs/pull/1592) ([roeap](https://github.com/roeap))
+- feat: buffered reading of transaction logs [\#1549](https://github.com/delta-io/delta-rs/pull/1549) ([eeroel](https://github.com/eeroel))
+- feat: merge operation [\#1522](https://github.com/delta-io/delta-rs/pull/1522) ([Blajda](https://github.com/Blajda))
+- feat: expose create\_checkpoint\_for to the public [\#1514](https://github.com/delta-io/delta-rs/pull/1514) ([haruband](https://github.com/haruband))
+- docs: update Readme [\#1440](https://github.com/delta-io/delta-rs/pull/1440) ([roeap](https://github.com/roeap))
+- refactor: re-organize top level modules [\#1434](https://github.com/delta-io/delta-rs/pull/1434) ([roeap](https://github.com/roeap))
+- feat: integrate unity catalog with datafusion [\#1338](https://github.com/delta-io/delta-rs/pull/1338) ([roeap](https://github.com/roeap))
+
## [rust-v0.15.0](https://github.com/delta-io/delta-rs/tree/rust-v0.15.0) (2023-09-06)
[Full Changelog](https://github.com/delta-io/delta-rs/compare/rust-v0.14.0...rust-v0.15.0)
diff --git a/Cargo.toml b/Cargo.toml
index 3015ce4bdd..0b3862bd1f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,10 @@
[workspace]
-members = ["rust", "python"]
-exclude = ["proofs", "delta-inspect"]
+members = [
+ "crates/*",
+ "delta-inspect",
+ "python",
+]
+exclude = ["proofs"]
resolver = "2"
[profile.release-with-debug]
@@ -15,23 +19,23 @@ debug = "line-tables-only"
[workspace.dependencies]
# arrow
-arrow = { version = "46" }
-arrow-array = { version = "46" }
-arrow-buffer = { version = "46" }
-arrow-cast = { version = "46" }
-arrow-ord = { version = "46" }
-arrow-row = { version = "46" }
-arrow-schema = { version = "46" }
-arrow-select = { version = "46" }
-parquet = { version = "46" }
+arrow = { version = "47" }
+arrow-array = { version = "47" }
+arrow-buffer = { version = "47" }
+arrow-cast = { version = "47" }
+arrow-ord = { version = "47" }
+arrow-row = { version = "47" }
+arrow-schema = { version = "47" }
+arrow-select = { version = "47" }
+parquet = { version = "47" }
# datafusion
-datafusion = { version = "31" }
-datafusion-expr = { version = "31" }
-datafusion-common = { version = "31" }
-datafusion-proto = { version = "31" }
-datafusion-sql = { version = "31" }
-datafusion-physical-expr = { version = "31" }
+datafusion = { version = "32" }
+datafusion-expr = { version = "32" }
+datafusion-common = { version = "32" }
+datafusion-proto = { version = "32" }
+datafusion-sql = { version = "32" }
+datafusion-physical-expr = { version = "32" }
# serde
serde = { version = "1", features = ["derive"] }
diff --git a/README.md b/README.md
index 09d8fa4753..b3dd824b77 100644
--- a/README.md
+++ b/README.md
@@ -32,7 +32,7 @@
-
+
@@ -57,7 +57,7 @@ API that lets you query, inspect, and operate your Delta Lake with ease.
- [Quick Start](#quick-start)
- [Get Involved](#get-involved)
-- [Integartions](#integrations)
+- [Integrations](#integrations)
- [Features](#features)
## Quick Start
@@ -66,8 +66,7 @@ The `deltalake` library aims to adopt patterns from other libraries in data proc
so getting started should look familiar.
```py3
-from deltalake import DeltaTable
-from deltalake.write import write_deltalake
+from deltalake import DeltaTable, write_deltalake
import pandas as pd
# write some data into a delta table
@@ -103,10 +102,10 @@ You can also try Delta Lake docker at [DockerHub](https://go.delta.io/dockerhub)
## Get Involved
-We encourage you to reach out, and are [commited](https://github.com/delta-io/delta-rs/blob/main/CODE_OF_CONDUCT.md)
+We encourage you to reach out, and are [committed](https://github.com/delta-io/delta-rs/blob/main/CODE_OF_CONDUCT.md)
to provide a welcoming community.
-- [Join us in our Slack workspace](https://go.delta.io/slack)
+- [Join us in our Slack workspace](https://join.slack.com/t/delta-users/shared_invite/zt-23h0xwez7-wDTm43ZVEW2ZcbKn6Bc8Fg)
- [Report an issue](https://github.com/delta-io/delta-rs/issues/new?template=bug_report.md)
- Looking to contribute? See our [good first issues](https://github.com/delta-io/delta-rs/contribute).
@@ -139,28 +138,28 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| S3 - R2 | ![done] | ![done] | requires lock for concurrent writes |
| Azure Blob | ![done] | ![done] | |
| Azure ADLS Gen2 | ![done] | ![done] | |
-| Micorosft OneLake | [![open]][onelake-rs] | [![open]][onelake-rs] | |
+| Microsoft OneLake | ![done] | ![done] | |
| Google Cloud Storage | ![done] | ![done] | |
### Supported Operations
-| Operation | Rust | Python | Description |
-| --------------------- | :-----------------: | :-----------------: | ------------------------------------- |
-| Create | ![done] | ![done] | Create a new table |
-| Read | ![done] | ![done] | Read data from a table |
-| Vacuum | ![done] | ![done] | Remove unused files and log entries |
-| Delete - partitions | | ![done] | Delete a table partition |
-| Delete - predicates | ![done] | | Delete data based on a predicate |
-| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file |
-| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file |
-| Merge | [![open]][merge-rs] | [![open]][merge-py] | |
-| FS check | ![done] | | Remove corrupted files from table |
+| Operation | Rust | Python | Description |
+| --------------------- | :----------------------: | :-----------------: | ------------------------------------------- |
+| Create | ![done] | ![done] | Create a new table |
+| Read | ![done] | ![done] | Read data from a table |
+| Vacuum | ![done] | ![done] | Remove unused files and log entries |
+| Delete - partitions | | ![done] | Delete a table partition |
+| Delete - predicates | ![done] | ![done] | Delete data based on a predicate |
+| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file |
+| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file |
+| Merge | [![semi-done]][merge-rs] | [![open]][merge-py] | Merge two tables (limited to full re-write) |
+| FS check | ![done] | | Remove corrupted files from table |
### Protocol Support Level
| Writer Version | Requirement | Status |
| -------------- | --------------------------------------------- | :------------------: |
-| Version 2 | Append Only Tables | [![open]][roadmap] |
+| Version 2 | Append Only Tables | ![done]
| Version 2 | Column Invariants | ![done] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
@@ -173,13 +172,14 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Reader Version | Requirement | Status |
| -------------- | ----------------------------------- | ------ |
-| Version 2 | Collumn Mapping | |
+| Version 2 | Column Mapping | |
| Version 3 | Table Features (requires reader V7) | |
[datafusion]: https://github.com/apache/arrow-datafusion
[ballista]: https://github.com/apache/arrow-ballista
[polars]: https://github.com/pola-rs/polars
[open]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/IssueNeutral.svg
+[semi-done]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/ApprovedChangesGrey.svg
[done]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/ApprovedChanges.svg
[roadmap]: https://github.com/delta-io/delta-rs/issues/1128
[merge-py]: https://github.com/delta-io/delta-rs/issues/1357
diff --git a/crates/README.md b/crates/README.md
new file mode 100644
index 0000000000..d5b17317ca
--- /dev/null
+++ b/crates/README.md
@@ -0,0 +1,3 @@
+# Delta Lake Rust crates
+
+This directory contains all of the crates published by the [delta-rs](https://github.com/delta-io/delta-rs) project. These crates were originally split based on the proposal in [#1713](https://github.com/delta-io/delta-rs/discussions/1713).
diff --git a/rust/.gitignore b/crates/deltalake-core/.gitignore
similarity index 100%
rename from rust/.gitignore
rename to crates/deltalake-core/.gitignore
diff --git a/rust/.ignore b/crates/deltalake-core/.ignore
similarity index 100%
rename from rust/.ignore
rename to crates/deltalake-core/.ignore
diff --git a/rust/Cargo.toml b/crates/deltalake-core/Cargo.toml
similarity index 90%
rename from rust/Cargo.toml
rename to crates/deltalake-core/Cargo.toml
index 72ad44bbac..e645b6bfd0 100644
--- a/rust/Cargo.toml
+++ b/crates/deltalake-core/Cargo.toml
@@ -1,6 +1,6 @@
[package]
-name = "deltalake"
-version = "0.15.0"
+name = "deltalake-core"
+version = "0.17.0"
rust-version = "1.64"
authors = ["Qingping Hou "]
homepage = "https://github.com/delta-io/delta.rs"
@@ -12,6 +12,11 @@ repository = "https://github.com/delta-io/delta.rs"
readme = "README.md"
edition = "2021"
+[package.metadata.docs.rs]
+# We cannot use all_features because TLS features are mutually exclusive.
+# We cannot use hdfs feature because it requires Java to be installed.
+features = ["azure", "datafusion", "gcs", "glue", "hdfs", "json", "python", "s3", "unity-experimental"]
+
[dependencies]
# arrow
arrow = { workspace = true, optional = true }
@@ -20,7 +25,7 @@ arrow-buffer = { workspace = true, optional = true }
arrow-cast = { workspace = true, optional = true }
arrow-ord = { workspace = true, optional = true }
arrow-row = { workspace = true, optional = true }
-arrow-schema = { workspace = true, optional = true }
+arrow-schema = { workspace = true, optional = true, features = ["serde"] }
arrow-select = { workspace = true, optional = true }
parquet = { workspace = true, features = [
"async",
@@ -102,10 +107,10 @@ reqwest = { version = "0.11.18", default-features = false, features = [
# Datafusion
dashmap = { version = "5", optional = true }
-sqlparser = { version = "0.37", optional = true }
+sqlparser = { version = "0.38", optional = true }
# NOTE dependencies only for integration tests
-fs_extra = { version = "1.2.0", optional = true }
+fs_extra = { version = "1.3.0", optional = true }
tempdir = { version = "0", optional = true }
dynamodb_lock = { version = "0", default-features = false, optional = true }
@@ -177,11 +182,3 @@ unity-experimental = ["reqwest", "tracing", "hyper"]
[[bench]]
name = "read_checkpoint"
harness = false
-
-[[example]]
-name = "basic_operations"
-required-features = ["datafusion"]
-
-[[example]]
-name = "recordbatch-writer"
-required-features = ["arrow"]
diff --git a/rust/README.md b/crates/deltalake-core/README.md
similarity index 87%
rename from rust/README.md
rename to crates/deltalake-core/README.md
index 659de48566..b251148c69 100644
--- a/rust/README.md
+++ b/crates/deltalake-core/README.md
@@ -16,8 +16,11 @@ println!("{}", table.get_files());
### CLI
+Navigate into the `delta-inspect` directory first and run the following command
+Please noted that the test data is under `rust` instead of `delta-inspect`
+
```bash
-❯ cargo run --bin delta-inspect files ./tests/data/delta-0.2.0
+❯ cargo run --bin delta-inspect files ../rust/tests/data/delta-0.2.0
part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet
part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet
part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet
@@ -33,7 +36,7 @@ DeltaTable(./tests/data/delta-0.2.0)
The examples folder shows how to use Rust API to manipulate Delta tables.
-Examples can be run using the `cargo run --example` command. For example:
+Navigate into the `rust` directory first and examples can be run using the `cargo run --example` command. For example:
```bash
cargo run --example read_delta_table
diff --git a/rust/benches/read_checkpoint.rs b/crates/deltalake-core/benches/read_checkpoint.rs
similarity index 91%
rename from rust/benches/read_checkpoint.rs
rename to crates/deltalake-core/benches/read_checkpoint.rs
index 9824f15eb0..2ecbee661b 100644
--- a/rust/benches/read_checkpoint.rs
+++ b/crates/deltalake-core/benches/read_checkpoint.rs
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
-use deltalake::delta::DeltaTableConfig;
-use deltalake::table_state::DeltaTableState;
+use deltalake::table::state::DeltaTableState;
+use deltalake::DeltaTableConfig;
use std::fs::File;
use std::io::Read;
diff --git a/rust/src/data_catalog/client/backoff.rs b/crates/deltalake-core/src/data_catalog/client/backoff.rs
similarity index 100%
rename from rust/src/data_catalog/client/backoff.rs
rename to crates/deltalake-core/src/data_catalog/client/backoff.rs
diff --git a/rust/src/data_catalog/client/mock_server.rs b/crates/deltalake-core/src/data_catalog/client/mock_server.rs
similarity index 100%
rename from rust/src/data_catalog/client/mock_server.rs
rename to crates/deltalake-core/src/data_catalog/client/mock_server.rs
diff --git a/rust/src/data_catalog/client/mod.rs b/crates/deltalake-core/src/data_catalog/client/mod.rs
similarity index 100%
rename from rust/src/data_catalog/client/mod.rs
rename to crates/deltalake-core/src/data_catalog/client/mod.rs
diff --git a/rust/src/data_catalog/client/pagination.rs b/crates/deltalake-core/src/data_catalog/client/pagination.rs
similarity index 100%
rename from rust/src/data_catalog/client/pagination.rs
rename to crates/deltalake-core/src/data_catalog/client/pagination.rs
diff --git a/rust/src/data_catalog/client/retry.rs b/crates/deltalake-core/src/data_catalog/client/retry.rs
similarity index 100%
rename from rust/src/data_catalog/client/retry.rs
rename to crates/deltalake-core/src/data_catalog/client/retry.rs
diff --git a/rust/src/data_catalog/client/token.rs b/crates/deltalake-core/src/data_catalog/client/token.rs
similarity index 100%
rename from rust/src/data_catalog/client/token.rs
rename to crates/deltalake-core/src/data_catalog/client/token.rs
diff --git a/rust/src/data_catalog/glue/mod.rs b/crates/deltalake-core/src/data_catalog/glue/mod.rs
similarity index 100%
rename from rust/src/data_catalog/glue/mod.rs
rename to crates/deltalake-core/src/data_catalog/glue/mod.rs
diff --git a/rust/src/data_catalog/mod.rs b/crates/deltalake-core/src/data_catalog/mod.rs
similarity index 100%
rename from rust/src/data_catalog/mod.rs
rename to crates/deltalake-core/src/data_catalog/mod.rs
diff --git a/rust/src/data_catalog/storage/mod.rs b/crates/deltalake-core/src/data_catalog/storage/mod.rs
similarity index 96%
rename from rust/src/data_catalog/storage/mod.rs
rename to crates/deltalake-core/src/data_catalog/storage/mod.rs
index 726afee102..f645d370c1 100644
--- a/rust/src/data_catalog/storage/mod.rs
+++ b/crates/deltalake-core/src/data_catalog/storage/mod.rs
@@ -13,8 +13,9 @@ use futures::TryStreamExt;
use object_store::ObjectStore;
use crate::errors::DeltaResult;
+use crate::open_table_with_storage_options;
use crate::storage::config::{configure_store, StorageOptions};
-use crate::{ensure_table_uri, open_table_with_storage_options};
+use crate::table::builder::ensure_table_uri;
const DELTA_LOG_FOLDER: &str = "_delta_log";
@@ -46,9 +47,9 @@ impl ListingSchemaProvider {
storage_options: Option>,
) -> DeltaResult {
let uri = ensure_table_uri(root_uri)?;
- let storage_options = storage_options.unwrap_or_default().into();
+ let mut storage_options = storage_options.unwrap_or_default().into();
// We already parsed the url, so unwrapping is safe.
- let store = configure_store(&uri, &storage_options)?;
+ let store = configure_store(&uri, &mut storage_options)?;
Ok(Self {
authority: uri.to_string(),
store,
diff --git a/rust/src/data_catalog/unity/credential.rs b/crates/deltalake-core/src/data_catalog/unity/credential.rs
similarity index 100%
rename from rust/src/data_catalog/unity/credential.rs
rename to crates/deltalake-core/src/data_catalog/unity/credential.rs
diff --git a/rust/src/data_catalog/unity/datafusion.rs b/crates/deltalake-core/src/data_catalog/unity/datafusion.rs
similarity index 100%
rename from rust/src/data_catalog/unity/datafusion.rs
rename to crates/deltalake-core/src/data_catalog/unity/datafusion.rs
diff --git a/rust/src/data_catalog/unity/mod.rs b/crates/deltalake-core/src/data_catalog/unity/mod.rs
similarity index 96%
rename from rust/src/data_catalog/unity/mod.rs
rename to crates/deltalake-core/src/data_catalog/unity/mod.rs
index 3110f07020..e9de725923 100644
--- a/rust/src/data_catalog/unity/mod.rs
+++ b/crates/deltalake-core/src/data_catalog/unity/mod.rs
@@ -84,16 +84,27 @@ pub enum UnityCatalogConfigKey {
/// - `unity_workspace_url`
/// - `databricks_workspace_url`
/// - `workspace_url`
+ #[deprecated(since = "0.17.0", note = "Please use the DATABRICKS_HOST env variable")]
WorkspaceUrl,
+ /// Host of the Databricks workspace
+ Host,
+
/// Access token to authorize API requests
///
/// Supported keys:
/// - `unity_access_token`
/// - `databricks_access_token`
/// - `access_token`
+ #[deprecated(
+ since = "0.17.0",
+ note = "Please use the DATABRICKS_TOKEN env variable"
+ )]
AccessToken,
+ /// Token to use for Databricks Unity
+ Token,
+
/// Service principal client id for authorizing requests
///
/// Supported keys:
@@ -167,6 +178,7 @@ pub enum UnityCatalogConfigKey {
impl FromStr for UnityCatalogConfigKey {
type Err = DataCatalogError;
+ #[allow(deprecated)]
fn from_str(s: &str) -> Result {
match s {
"access_token" | "unity_access_token" | "databricks_access_token" => {
@@ -187,6 +199,7 @@ impl FromStr for UnityCatalogConfigKey {
"federated_token_file"
| "unity_federated_token_file"
| "databricks_federated_token_file" => Ok(UnityCatalogConfigKey::FederatedTokenFile),
+ "host" => Ok(UnityCatalogConfigKey::Host),
"msi_endpoint" | "unity_msi_endpoint" | "databricks_msi_endpoint" => {
Ok(UnityCatalogConfigKey::MsiEndpoint)
}
@@ -196,6 +209,7 @@ impl FromStr for UnityCatalogConfigKey {
"object_id" | "unity_object_id" | "databricks_object_id" => {
Ok(UnityCatalogConfigKey::ObjectId)
}
+ "token" => Ok(UnityCatalogConfigKey::Token),
"use_azure_cli" | "unity_use_azure_cli" | "databricks_use_azure_cli" => {
Ok(UnityCatalogConfigKey::UseAzureCli)
}
@@ -207,6 +221,7 @@ impl FromStr for UnityCatalogConfigKey {
}
}
+#[allow(deprecated)]
impl AsRef for UnityCatalogConfigKey {
fn as_ref(&self) -> &str {
match self {
@@ -216,10 +231,12 @@ impl AsRef for UnityCatalogConfigKey {
UnityCatalogConfigKey::ClientId => "unity_client_id",
UnityCatalogConfigKey::ClientSecret => "unity_client_secret",
UnityCatalogConfigKey::FederatedTokenFile => "unity_federated_token_file",
+ UnityCatalogConfigKey::Host => "databricks_host",
UnityCatalogConfigKey::MsiEndpoint => "unity_msi_endpoint",
UnityCatalogConfigKey::MsiResourceId => "unity_msi_resource_id",
UnityCatalogConfigKey::ObjectId => "unity_object_id",
UnityCatalogConfigKey::UseAzureCli => "unity_use_azure_cli",
+ UnityCatalogConfigKey::Token => "databricks_token",
UnityCatalogConfigKey::WorkspaceUrl => "unity_workspace_url",
}
}
@@ -268,6 +285,7 @@ pub struct UnityCatalogBuilder {
client_options: super::client::ClientOptions,
}
+#[allow(deprecated)]
impl UnityCatalogBuilder {
/// Create a new [`UnityCatalogBuilder`] with default values.
pub fn new() -> Self {
@@ -281,19 +299,21 @@ impl UnityCatalogBuilder {
value: impl Into,
) -> DataCatalogResult {
match UnityCatalogConfigKey::from_str(key.as_ref())? {
- UnityCatalogConfigKey::WorkspaceUrl => self.workspace_url = Some(value.into()),
UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
UnityCatalogConfigKey::AuthorityHost => self.authority_host = Some(value.into()),
+ UnityCatalogConfigKey::Host => self.workspace_url = Some(value.into()),
UnityCatalogConfigKey::MsiEndpoint => self.msi_endpoint = Some(value.into()),
UnityCatalogConfigKey::ObjectId => self.object_id = Some(value.into()),
UnityCatalogConfigKey::MsiResourceId => self.msi_resource_id = Some(value.into()),
UnityCatalogConfigKey::FederatedTokenFile => {
self.federated_token_file = Some(value.into())
}
+ UnityCatalogConfigKey::Token => self.bearer_token = Some(value.into()),
UnityCatalogConfigKey::UseAzureCli => self.use_azure_cli = str_is_truthy(&value.into()),
+ UnityCatalogConfigKey::WorkspaceUrl => self.workspace_url = Some(value.into()),
};
Ok(self)
}
diff --git a/rust/src/data_catalog/unity/models.rs b/crates/deltalake-core/src/data_catalog/unity/models.rs
similarity index 100%
rename from rust/src/data_catalog/unity/models.rs
rename to crates/deltalake-core/src/data_catalog/unity/models.rs
diff --git a/crates/deltalake-core/src/delta_datafusion/expr.rs b/crates/deltalake-core/src/delta_datafusion/expr.rs
new file mode 100644
index 0000000000..815b01831f
--- /dev/null
+++ b/crates/deltalake-core/src/delta_datafusion/expr.rs
@@ -0,0 +1,582 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This product includes software from the Datafusion project (Apache 2.0)
+// https://github.com/apache/arrow-datafusion
+// Display functions and required macros were pulled from https://github.com/apache/arrow-datafusion/blob/ddb95497e2792015d5a5998eec79aac8d37df1eb/datafusion/expr/src/expr.rs
+
+//! Utility functions for Datafusion's Expressions
+
+use std::{
+ fmt::{self, Display, Formatter, Write},
+ sync::Arc,
+};
+
+use arrow_schema::DataType;
+use datafusion::execution::context::SessionState;
+use datafusion_common::Result as DFResult;
+use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
+use datafusion_expr::{
+ expr::{InList, ScalarUDF},
+ AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource,
+};
+use datafusion_sql::planner::{ContextProvider, SqlToRel};
+use sqlparser::ast::escape_quoted_string;
+use sqlparser::dialect::GenericDialect;
+use sqlparser::parser::Parser;
+use sqlparser::tokenizer::Tokenizer;
+
+use crate::{DeltaResult, DeltaTableError};
+
+pub(crate) struct DeltaContextProvider<'a> {
+ state: &'a SessionState,
+}
+
+impl<'a> ContextProvider for DeltaContextProvider<'a> {
+ fn get_table_provider(&self, _name: TableReference) -> DFResult> {
+ unimplemented!()
+ }
+
+ fn get_function_meta(&self, name: &str) -> Option> {
+ self.state.scalar_functions().get(name).cloned()
+ }
+
+ fn get_aggregate_meta(&self, name: &str) -> Option> {
+ self.state.aggregate_functions().get(name).cloned()
+ }
+
+ fn get_variable_type(&self, _var: &[String]) -> Option {
+ unimplemented!()
+ }
+
+ fn options(&self) -> &ConfigOptions {
+ self.state.config_options()
+ }
+
+ fn get_window_meta(&self, name: &str) -> Option> {
+ self.state.window_functions().get(name).cloned()
+ }
+}
+
+/// Parse a string predicate into an `Expr`
+pub(crate) fn parse_predicate_expression(
+ schema: &DFSchema,
+ expr: impl AsRef,
+ df_state: &SessionState,
+) -> DeltaResult {
+ let dialect = &GenericDialect {};
+ let mut tokenizer = Tokenizer::new(dialect, expr.as_ref());
+ let tokens = tokenizer
+ .tokenize()
+ .map_err(|err| DeltaTableError::GenericError {
+ source: Box::new(err),
+ })?;
+ let sql = Parser::new(dialect)
+ .with_tokens(tokens)
+ .parse_expr()
+ .map_err(|err| DeltaTableError::GenericError {
+ source: Box::new(err),
+ })?;
+
+ let context_provider = DeltaContextProvider { state: df_state };
+ let sql_to_rel = SqlToRel::new(&context_provider);
+
+ Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?)
+}
+
+struct SqlFormat<'a> {
+ expr: &'a Expr,
+}
+
+macro_rules! expr_vec_fmt {
+ ( $ARRAY:expr ) => {{
+ $ARRAY
+ .iter()
+ .map(|e| format!("{}", SqlFormat { expr: e }))
+ .collect::>()
+ .join(", ")
+ }};
+}
+
+struct BinaryExprFormat<'a> {
+ expr: &'a BinaryExpr,
+}
+
+impl<'a> Display for BinaryExprFormat<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ // Put parentheses around child binary expressions so that we can see the difference
+ // between `(a OR b) AND c` and `a OR (b AND c)`. We only insert parentheses when needed,
+ // based on operator precedence. For example, `(a AND b) OR c` and `a AND b OR c` are
+ // equivalent and the parentheses are not necessary.
+
+ fn write_child(f: &mut Formatter<'_>, expr: &Expr, precedence: u8) -> fmt::Result {
+ match expr {
+ Expr::BinaryExpr(child) => {
+ let p = child.op.precedence();
+ if p == 0 || p < precedence {
+ write!(f, "({})", BinaryExprFormat { expr: child })?;
+ } else {
+ write!(f, "{}", BinaryExprFormat { expr: child })?;
+ }
+ }
+ _ => write!(f, "{}", SqlFormat { expr })?,
+ }
+ Ok(())
+ }
+
+ let precedence = self.expr.op.precedence();
+ write_child(f, self.expr.left.as_ref(), precedence)?;
+ write!(f, " {} ", self.expr.op)?;
+ write_child(f, self.expr.right.as_ref(), precedence)
+ }
+}
+
+impl<'a> Display for SqlFormat<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self.expr {
+ Expr::Column(c) => write!(f, "{c}"),
+ Expr::Literal(v) => write!(f, "{}", ScalarValueFormat { scalar: v }),
+ Expr::Case(case) => {
+ write!(f, "CASE ")?;
+ if let Some(e) = &case.expr {
+ write!(f, "{} ", SqlFormat { expr: e })?;
+ }
+ for (w, t) in &case.when_then_expr {
+ write!(
+ f,
+ "WHEN {} THEN {} ",
+ SqlFormat { expr: w },
+ SqlFormat { expr: t }
+ )?;
+ }
+ if let Some(e) = &case.else_expr {
+ write!(f, "ELSE {} ", SqlFormat { expr: e })?;
+ }
+ write!(f, "END")
+ }
+ Expr::Not(expr) => write!(f, "NOT {}", SqlFormat { expr }),
+ Expr::Negative(expr) => write!(f, "(- {})", SqlFormat { expr }),
+ Expr::IsNull(expr) => write!(f, "{} IS NULL", SqlFormat { expr }),
+ Expr::IsNotNull(expr) => write!(f, "{} IS NOT NULL", SqlFormat { expr }),
+ Expr::IsTrue(expr) => write!(f, "{} IS TRUE", SqlFormat { expr }),
+ Expr::IsFalse(expr) => write!(f, "{} IS FALSE", SqlFormat { expr }),
+ Expr::IsUnknown(expr) => write!(f, "{} IS UNKNOWN", SqlFormat { expr }),
+ Expr::IsNotTrue(expr) => write!(f, "{} IS NOT TRUE", SqlFormat { expr }),
+ Expr::IsNotFalse(expr) => write!(f, "{} IS NOT FALSE", SqlFormat { expr }),
+ Expr::IsNotUnknown(expr) => write!(f, "{} IS NOT UNKNOWN", SqlFormat { expr }),
+ Expr::BinaryExpr(expr) => write!(f, "{}", BinaryExprFormat { expr }),
+ Expr::ScalarFunction(func) => fmt_function(f, &func.fun.to_string(), false, &func.args),
+ Expr::ScalarUDF(ScalarUDF { fun, args }) => fmt_function(f, &fun.name, false, args),
+ Expr::Cast(Cast { expr, data_type }) => {
+ write!(f, "arrow_cast({}, '{}')", SqlFormat { expr }, data_type)
+ }
+ Expr::Between(Between {
+ expr,
+ negated,
+ low,
+ high,
+ }) => {
+ if *negated {
+ write!(
+ f,
+ "{} NOT BETWEEN {} AND {}",
+ SqlFormat { expr },
+ SqlFormat { expr: low },
+ SqlFormat { expr: high }
+ )
+ } else {
+ write!(
+ f,
+ "{} BETWEEN {} AND {}",
+ SqlFormat { expr },
+ SqlFormat { expr: low },
+ SqlFormat { expr: high }
+ )
+ }
+ }
+ Expr::Like(Like {
+ negated,
+ expr,
+ pattern,
+ escape_char,
+ case_insensitive,
+ }) => {
+ write!(f, "{}", SqlFormat { expr })?;
+ let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" };
+ if *negated {
+ write!(f, " NOT")?;
+ }
+ if let Some(char) = escape_char {
+ write!(
+ f,
+ " {op_name} {} ESCAPE '{char}'",
+ SqlFormat { expr: pattern }
+ )
+ } else {
+ write!(f, " {op_name} {}", SqlFormat { expr: pattern })
+ }
+ }
+ Expr::SimilarTo(Like {
+ negated,
+ expr,
+ pattern,
+ escape_char,
+ case_insensitive: _,
+ }) => {
+ write!(f, "{expr}")?;
+ if *negated {
+ write!(f, " NOT")?;
+ }
+ if let Some(char) = escape_char {
+ write!(f, " SIMILAR TO {pattern} ESCAPE '{char}'")
+ } else {
+ write!(f, " SIMILAR TO {pattern}")
+ }
+ }
+ Expr::InList(InList {
+ expr,
+ list,
+ negated,
+ }) => {
+ if *negated {
+ write!(f, "{expr} NOT IN ({})", expr_vec_fmt!(list))
+ } else {
+ write!(f, "{expr} IN ({})", expr_vec_fmt!(list))
+ }
+ }
+ _ => Err(fmt::Error),
+ }
+ }
+}
+
+/// Format an `Expr` to a parsable SQL expression
+pub fn fmt_expr_to_sql(expr: &Expr) -> Result {
+ let mut s = String::new();
+ write!(&mut s, "{}", SqlFormat { expr }).map_err(|_| {
+ DeltaTableError::Generic("Unable to convert expression to string".to_owned())
+ })?;
+ Ok(s)
+}
+
+fn fmt_function(f: &mut fmt::Formatter, fun: &str, distinct: bool, args: &[Expr]) -> fmt::Result {
+ let args: Vec = args
+ .iter()
+ .map(|arg| format!("{}", SqlFormat { expr: arg }))
+ .collect();
+
+ let distinct_str = match distinct {
+ true => "DISTINCT ",
+ false => "",
+ };
+ write!(f, "{}({}{})", fun, distinct_str, args.join(", "))
+}
+
+macro_rules! format_option {
+ ($F:expr, $EXPR:expr) => {{
+ match $EXPR {
+ Some(e) => write!($F, "{e}"),
+ None => write!($F, "NULL"),
+ }
+ }};
+}
+
+struct ScalarValueFormat<'a> {
+ scalar: &'a ScalarValue,
+}
+
+impl<'a> fmt::Display for ScalarValueFormat<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self.scalar {
+ ScalarValue::Boolean(e) => format_option!(f, e)?,
+ ScalarValue::Float32(e) => format_option!(f, e)?,
+ ScalarValue::Float64(e) => format_option!(f, e)?,
+ ScalarValue::Int8(e) => format_option!(f, e)?,
+ ScalarValue::Int16(e) => format_option!(f, e)?,
+ ScalarValue::Int32(e) => format_option!(f, e)?,
+ ScalarValue::Int64(e) => format_option!(f, e)?,
+ ScalarValue::UInt8(e) => format_option!(f, e)?,
+ ScalarValue::UInt16(e) => format_option!(f, e)?,
+ ScalarValue::UInt32(e) => format_option!(f, e)?,
+ ScalarValue::UInt64(e) => format_option!(f, e)?,
+ ScalarValue::Utf8(e) | ScalarValue::LargeUtf8(e) => match e {
+ Some(e) => write!(f, "'{}'", escape_quoted_string(e, '\''))?,
+ None => write!(f, "NULL")?,
+ },
+ ScalarValue::Binary(e)
+ | ScalarValue::FixedSizeBinary(_, e)
+ | ScalarValue::LargeBinary(e) => match e {
+ Some(l) => write!(
+ f,
+ "decode('{}', 'hex')",
+ l.iter()
+ .map(|v| format!("{v:02x}"))
+ .collect::>()
+ .join("")
+ )?,
+ None => write!(f, "NULL")?,
+ },
+ ScalarValue::Null => write!(f, "NULL")?,
+ _ => return Err(fmt::Error),
+ };
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::collections::HashMap;
+
+ use arrow_schema::DataType;
+ use datafusion::prelude::SessionContext;
+ use datafusion_common::{DFSchema, ScalarValue};
+ use datafusion_expr::{col, decode, lit, substring, Cast, Expr, ExprSchemable};
+
+ use crate::{DeltaOps, DeltaTable, Schema, SchemaDataType, SchemaField};
+
+ use super::fmt_expr_to_sql;
+
+ struct ParseTest {
+ expr: Expr,
+ expected: String,
+ override_expected_expr: Option,
+ }
+
+ macro_rules! simple {
+ ( $EXPR:expr, $STR:expr ) => {{
+ ParseTest {
+ expr: $EXPR,
+ expected: $STR,
+ override_expected_expr: None,
+ }
+ }};
+ }
+
+ async fn setup_table() -> DeltaTable {
+ let schema = Schema::new(vec![
+ SchemaField::new(
+ "id".to_string(),
+ SchemaDataType::primitive("string".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ SchemaField::new(
+ "value".to_string(),
+ SchemaDataType::primitive("integer".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ SchemaField::new(
+ "value2".to_string(),
+ SchemaDataType::primitive("integer".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ SchemaField::new(
+ "modified".to_string(),
+ SchemaDataType::primitive("string".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ SchemaField::new(
+ "active".to_string(),
+ SchemaDataType::primitive("boolean".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ SchemaField::new(
+ "money".to_string(),
+ SchemaDataType::primitive("decimal(12,2)".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ SchemaField::new(
+ "_date".to_string(),
+ SchemaDataType::primitive("date".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ SchemaField::new(
+ "_timestamp".to_string(),
+ SchemaDataType::primitive("timestamp".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ SchemaField::new(
+ "_binary".to_string(),
+ SchemaDataType::primitive("binary".to_string()),
+ true,
+ HashMap::new(),
+ ),
+ ]);
+
+ let table = DeltaOps::new_in_memory()
+ .create()
+ .with_columns(schema.get_fields().clone())
+ .await
+ .unwrap();
+ assert_eq!(table.version(), 0);
+ table
+ }
+
+ #[tokio::test]
+ async fn test_expr_sql() {
+ let table = setup_table().await;
+
+ // String expression that we output must be parsable for conflict resolution.
+ let tests = vec![
+ simple!(
+ Expr::Cast(Cast {
+ expr: Box::new(lit(1_i64)),
+ data_type: DataType::Int32
+ }),
+ "arrow_cast(1, 'Int32')".to_string()
+ ),
+ simple!(col("value").eq(lit(3_i64)), "value = 3".to_string()),
+ simple!(col("active").is_true(), "active IS TRUE".to_string()),
+ simple!(col("active"), "active".to_string()),
+ simple!(col("active").eq(lit(true)), "active = true".to_string()),
+ simple!(col("active").is_null(), "active IS NULL".to_string()),
+ simple!(
+ col("modified").eq(lit("2021-02-03")),
+ "modified = '2021-02-03'".to_string()
+ ),
+ simple!(
+ col("modified").eq(lit("'validate ' escapi\\ng'")),
+ "modified = '''validate '' escapi\\ng'''".to_string()
+ ),
+ simple!(col("money").gt(lit(0.10)), "money > 0.1".to_string()),
+ ParseTest {
+ expr: col("_binary").eq(lit(ScalarValue::Binary(Some(vec![0xAA, 0x00, 0xFF])))),
+ expected: "_binary = decode('aa00ff', 'hex')".to_string(),
+ override_expected_expr: Some(col("_binary").eq(decode(lit("aa00ff"), lit("hex")))),
+ },
+ simple!(
+ col("value").between(lit(20_i64), lit(30_i64)),
+ "value BETWEEN 20 AND 30".to_string()
+ ),
+ simple!(
+ col("value").not_between(lit(20_i64), lit(30_i64)),
+ "value NOT BETWEEN 20 AND 30".to_string()
+ ),
+ simple!(
+ col("modified").like(lit("abc%")),
+ "modified LIKE 'abc%'".to_string()
+ ),
+ simple!(
+ col("modified").not_like(lit("abc%")),
+ "modified NOT LIKE 'abc%'".to_string()
+ ),
+ simple!(
+ (((col("value") * lit(2_i64) + col("value2")) / lit(3_i64)) - col("value"))
+ .gt(lit(0_i64)),
+ "(value * 2 + value2) / 3 - value > 0".to_string()
+ ),
+ simple!(
+ col("modified").in_list(vec![lit("a"), lit("c")], false),
+ "modified IN ('a', 'c')".to_string()
+ ),
+ simple!(
+ col("modified").in_list(vec![lit("a"), lit("c")], true),
+ "modified NOT IN ('a', 'c')".to_string()
+ ),
+ // Validate order of operations is maintained
+ simple!(
+ col("modified")
+ .eq(lit("value"))
+ .and(col("value").eq(lit(1_i64)))
+ .or(col("modified")
+ .eq(lit("value2"))
+ .and(col("value").gt(lit(1_i64)))),
+ "modified = 'value' AND value = 1 OR modified = 'value2' AND value > 1".to_string()
+ ),
+ simple!(
+ col("modified")
+ .eq(lit("value"))
+ .or(col("value").eq(lit(1_i64)))
+ .and(
+ col("modified")
+ .eq(lit("value2"))
+ .or(col("value").gt(lit(1_i64))),
+ ),
+ "(modified = 'value' OR value = 1) AND (modified = 'value2' OR value > 1)"
+ .to_string()
+ ),
+ // Validate functions are correctly parsed
+ simple!(
+ substring(col("modified"), lit(0_i64), lit(4_i64)).eq(lit("2021")),
+ "substr(modified, 0, 4) = '2021'".to_string()
+ ),
+ simple!(
+ col("value")
+ .cast_to::(
+ &arrow_schema::DataType::Utf8,
+ &table
+ .state
+ .input_schema()
+ .unwrap()
+ .as_ref()
+ .to_owned()
+ .try_into()
+ .unwrap()
+ )
+ .unwrap()
+ .eq(lit("1")),
+ "arrow_cast(value, 'Utf8') = '1'".to_string()
+ ),
+ ];
+
+ let session = SessionContext::new();
+
+ for test in tests {
+ let actual = fmt_expr_to_sql(&test.expr).unwrap();
+ assert_eq!(test.expected, actual);
+
+ let actual_expr = table
+ .state
+ .parse_predicate_expression(actual, &session.state())
+ .unwrap();
+
+ match test.override_expected_expr {
+ None => assert_eq!(test.expr, actual_expr),
+ Some(expr) => assert_eq!(expr, actual_expr),
+ }
+ }
+
+ let unsupported_types = vec![
+ /* TODO: Determine proper way to display decimal values in an sql expression*/
+ simple!(
+ col("money").gt(lit(ScalarValue::Decimal128(Some(100), 12, 2))),
+ "money > 0.1".to_string()
+ ),
+ simple!(
+ col("_timestamp").gt(lit(ScalarValue::TimestampMillisecond(Some(100), None))),
+ "".to_string()
+ ),
+ simple!(
+ col("_timestamp").gt(lit(ScalarValue::TimestampMillisecond(
+ Some(100),
+ Some("UTC".into())
+ ))),
+ "".to_string()
+ ),
+ ];
+
+ for test in unsupported_types {
+ assert!(fmt_expr_to_sql(&test.expr).is_err());
+ }
+ }
+}
diff --git a/rust/src/delta_datafusion.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs
similarity index 71%
rename from rust/src/delta_datafusion.rs
rename to crates/deltalake-core/src/delta_datafusion/mod.rs
index e69c7abbfe..7fbe362afc 100644
--- a/rust/src/delta_datafusion.rs
+++ b/crates/deltalake-core/src/delta_datafusion/mod.rs
@@ -8,7 +8,7 @@
//!
//! async {
//! let mut ctx = SessionContext::new();
-//! let table = deltalake::open_table("./tests/data/simple_table")
+//! let table = deltalake_core::open_table("./tests/data/simple_table")
//! .await
//! .unwrap();
//! ctx.register_table("demo", Arc::new(table)).unwrap();
@@ -21,7 +21,7 @@
//! ```
use std::any::Any;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::fmt::{self, Debug};
use std::sync::Arc;
@@ -32,12 +32,15 @@ use arrow::datatypes::DataType;
use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
-use arrow_array::StringArray;
+use arrow_array::types::UInt16Type;
+use arrow_array::{DictionaryArray, StringArray};
use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{NaiveDateTime, TimeZone, Utc};
use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat};
-use datafusion::datasource::physical_plan::FileScanConfig;
+use datafusion::datasource::physical_plan::{
+ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
+};
use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
@@ -54,9 +57,7 @@ use datafusion::physical_plan::{
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
-use datafusion_common::{
- Column, DFSchema, DataFusionError, Result as DataFusionResult, ToDFSchema,
-};
+use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema};
use datafusion_expr::expr::{ScalarFunction, ScalarUDF};
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
@@ -65,17 +66,20 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::ObjectMeta;
+use serde::{Deserialize, Serialize};
use url::Url;
-use crate::action::{self, Add};
-use crate::builder::ensure_table_uri;
use crate::errors::{DeltaResult, DeltaTableError};
+use crate::protocol::{self, Add};
use crate::storage::ObjectStoreRef;
-use crate::table_state::DeltaTableState;
+use crate::table::builder::ensure_table_uri;
+use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant, SchemaDataType};
const PATH_COLUMN: &str = "__delta_rs_path";
+pub mod expr;
+
impl From for DataFusionError {
fn from(err: DeltaTableError) -> Self {
match err {
@@ -124,7 +128,7 @@ impl DeltaTableState {
|acc, action| {
let new_stats = action
.get_stats()
- .unwrap_or_else(|_| Some(action::Stats::default()))?;
+ .unwrap_or_else(|_| Some(protocol::Stats::default()))?;
Some(Statistics {
num_rows: acc
.num_rows
@@ -360,66 +364,298 @@ pub(crate) fn register_store(store: ObjectStoreRef, env: Arc) {
env.register_object_store(url, store);
}
-/// Create a Parquet scan limited to a set of files
-#[allow(clippy::too_many_arguments)]
-pub(crate) async fn parquet_scan_from_actions(
+pub(crate) fn logical_schema(
snapshot: &DeltaTableState,
+ scan_config: &DeltaScanConfig,
+) -> DeltaResult {
+ let input_schema = snapshot.input_schema()?;
+ let mut fields = Vec::new();
+ for field in input_schema.fields.iter() {
+ fields.push(field.to_owned());
+ }
+
+ if let Some(file_column_name) = &scan_config.file_column_name {
+ fields.push(Arc::new(Field::new(
+ file_column_name,
+ arrow_schema::DataType::Utf8,
+ true,
+ )));
+ }
+
+ Ok(Arc::new(ArrowSchema::new(fields)))
+}
+
+#[derive(Debug, Clone, Default)]
+/// Used to specify if additonal metadata columns are exposed to the user
+pub struct DeltaScanConfigBuilder {
+ /// Include the source path for each record. The name of this column is determine by `file_column_name`
+ include_file_column: bool,
+ /// Column name that contains the source path.
+ ///
+ /// If include_file_column is true and the name is None then it will be auto-generated
+ /// Otherwise the user provided name will be used
+ file_column_name: Option,
+}
+
+impl DeltaScanConfigBuilder {
+ /// Construct a new instance of `DeltaScanConfigBuilder`
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Indicate that a column containing a records file path is included.
+ /// Column name is generated and can be determined once this Config is built
+ pub fn with_file_column(mut self, include: bool) -> Self {
+ self.include_file_column = include;
+ self.file_column_name = None;
+ self
+ }
+
+ /// Indicate that a column containing a records file path is included and column name is user defined.
+ pub fn with_file_column_name(mut self, name: &S) -> Self {
+ self.file_column_name = Some(name.to_string());
+ self.include_file_column = true;
+ self
+ }
+
+ /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
+ pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult {
+ let input_schema = snapshot.input_schema()?;
+ let mut file_column_name = None;
+ let mut column_names: HashSet<&String> = HashSet::new();
+ for field in input_schema.fields.iter() {
+ column_names.insert(field.name());
+ }
+
+ if self.include_file_column {
+ match &self.file_column_name {
+ Some(name) => {
+ if column_names.contains(name) {
+ return Err(DeltaTableError::Generic(format!(
+ "Unable to add file path column since column with name {} exits",
+ name
+ )));
+ }
+
+ file_column_name = Some(name.to_owned())
+ }
+ None => {
+ let prefix = PATH_COLUMN;
+ let mut idx = 0;
+ let mut name = prefix.to_owned();
+
+ while column_names.contains(&name) {
+ idx += 1;
+ name = format!("{}_{}", prefix, idx);
+ }
+
+ file_column_name = Some(name);
+ }
+ }
+ }
+
+ Ok(DeltaScanConfig { file_column_name })
+ }
+}
+
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+/// Include additonal metadata columns during a [`DeltaScan`]
+pub struct DeltaScanConfig {
+ /// Include the source path for each record
+ pub file_column_name: Option,
+}
+
+#[derive(Debug)]
+pub(crate) struct DeltaScanBuilder<'a> {
+ snapshot: &'a DeltaTableState,
object_store: ObjectStoreRef,
- actions: &[Add],
- schema: &ArrowSchema,
- filters: Option>,
- state: &SessionState,
- projection: Option<&Vec>,
+ filter: Option,
+ state: &'a SessionState,
+ projection: Option<&'a Vec>,
limit: Option,
-) -> DataFusionResult> {
- // TODO we group files together by their partition values. If the table is partitioned
- // and partitions are somewhat evenly distributed, probably not the worst choice ...
- // However we may want to do some additional balancing in case we are far off from the above.
- let mut file_groups: HashMap, Vec> = HashMap::new();
- for action in actions {
- let part = partitioned_file_from_action(action, schema);
- file_groups
- .entry(part.partition_values.clone())
- .or_default()
- .push(part);
- }
-
- let table_partition_cols = snapshot
- .current_metadata()
- .ok_or(DeltaTableError::NoMetadata)?
- .partition_columns
- .clone();
- let file_schema = Arc::new(ArrowSchema::new(
- schema
- .fields()
+ files: Option<&'a [Add]>,
+ config: DeltaScanConfig,
+ schema: Option,
+}
+
+impl<'a> DeltaScanBuilder<'a> {
+ pub fn new(
+ snapshot: &'a DeltaTableState,
+ object_store: ObjectStoreRef,
+ state: &'a SessionState,
+ ) -> Self {
+ DeltaScanBuilder {
+ snapshot,
+ object_store,
+ filter: None,
+ state,
+ files: None,
+ projection: None,
+ limit: None,
+ config: DeltaScanConfig::default(),
+ schema: None,
+ }
+ }
+
+ pub fn with_filter(mut self, filter: Option) -> Self {
+ self.filter = filter;
+ self
+ }
+
+ pub fn with_files(mut self, files: &'a [Add]) -> Self {
+ self.files = Some(files);
+ self
+ }
+
+ pub fn with_projection(mut self, projection: Option<&'a Vec>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ pub fn with_limit(mut self, limit: Option) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ pub fn with_scan_config(mut self, config: DeltaScanConfig) -> Self {
+ self.config = config;
+ self
+ }
+
+ pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+ self.schema = Some(schema);
+ self
+ }
+
+ pub async fn build(self) -> DeltaResult {
+ let config = self.config;
+ let schema = match self.schema {
+ Some(schema) => schema,
+ None => {
+ self.snapshot
+ .physical_arrow_schema(self.object_store.clone())
+ .await?
+ }
+ };
+ let logical_schema = logical_schema(self.snapshot, &config)?;
+
+ let logical_schema = if let Some(used_columns) = self.projection {
+ let mut fields = vec![];
+ for idx in used_columns {
+ fields.push(logical_schema.field(*idx).to_owned());
+ }
+ Arc::new(ArrowSchema::new(fields))
+ } else {
+ logical_schema
+ };
+
+ let logical_filter = self
+ .filter
+ .map(|expr| logical_expr_to_physical_expr(&expr, &logical_schema));
+
+ // Perform Pruning of files to scan
+ let files = match self.files {
+ Some(files) => files.to_owned(),
+ None => {
+ if let Some(predicate) = &logical_filter {
+ let pruning_predicate =
+ PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
+ let files_to_prune = pruning_predicate.prune(self.snapshot)?;
+ self.snapshot
+ .files()
+ .iter()
+ .zip(files_to_prune.into_iter())
+ .filter_map(
+ |(action, keep)| {
+ if keep {
+ Some(action.to_owned())
+ } else {
+ None
+ }
+ },
+ )
+ .collect()
+ } else {
+ self.snapshot.files().to_owned()
+ }
+ }
+ };
+
+ // TODO we group files together by their partition values. If the table is partitioned
+ // and partitions are somewhat evenly distributed, probably not the worst choice ...
+ // However we may want to do some additional balancing in case we are far off from the above.
+ let mut file_groups: HashMap, Vec> = HashMap::new();
+
+ let table_partition_cols = &self
+ .snapshot
+ .current_metadata()
+ .ok_or(DeltaTableError::NoMetadata)?
+ .partition_columns;
+
+ for action in files.iter() {
+ let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);
+
+ if config.file_column_name.is_some() {
+ part.partition_values
+ .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ action.path.clone(),
+ ))));
+ }
+
+ file_groups
+ .entry(part.partition_values.clone())
+ .or_default()
+ .push(part);
+ }
+
+ let file_schema = Arc::new(ArrowSchema::new(
+ schema
+ .fields()
+ .iter()
+ .filter(|f| !table_partition_cols.contains(f.name()))
+ .cloned()
+ .collect::>(),
+ ));
+
+ let mut table_partition_cols = table_partition_cols
.iter()
- .filter(|f| !table_partition_cols.contains(f.name()))
- .cloned()
- .collect::>(),
- ));
+ .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone())))
+ .collect::, ArrowError>>()?;
- let table_partition_cols = table_partition_cols
- .iter()
- .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone())))
- .collect::, ArrowError>>()?;
+ if let Some(file_column_name) = &config.file_column_name {
+ table_partition_cols.push((
+ file_column_name.clone(),
+ wrap_partition_type_in_dict(DataType::Utf8),
+ ));
+ }
- ParquetFormat::new()
- .create_physical_plan(
- state,
- FileScanConfig {
- object_store_url: object_store.object_store_url(),
- file_schema,
- file_groups: file_groups.into_values().collect(),
- statistics: snapshot.datafusion_table_statistics(),
- projection: projection.cloned(),
- limit,
- table_partition_cols,
- output_ordering: vec![],
- infinite_source: false,
- },
- filters.as_ref(),
- )
- .await
+ let scan = ParquetFormat::new()
+ .create_physical_plan(
+ self.state,
+ FileScanConfig {
+ object_store_url: self.object_store.object_store_url(),
+ file_schema,
+ file_groups: file_groups.into_values().collect(),
+ statistics: self.snapshot.datafusion_table_statistics(),
+ projection: self.projection.cloned(),
+ limit: self.limit,
+ table_partition_cols,
+ output_ordering: vec![],
+ infinite_source: false,
+ },
+ logical_filter.as_ref(),
+ )
+ .await?;
+
+ Ok(DeltaScan {
+ table_uri: ensure_table_uri(self.object_store.root_uri())?
+ .as_str()
+ .into(),
+ parquet_scan: scan,
+ config,
+ logical_schema,
+ })
+ }
}
#[async_trait]
@@ -451,53 +687,96 @@ impl TableProvider for DeltaTable {
filters: &[Expr],
limit: Option,
) -> DataFusionResult> {
- let schema = self
- .state
- .physical_arrow_schema(self.object_store())
+ register_store(self.object_store(), session.runtime_env().clone());
+ let filter_expr = conjunction(filters.iter().cloned());
+
+ let scan = DeltaScanBuilder::new(&self.state, self.object_store(), session)
+ .with_projection(projection)
+ .with_limit(limit)
+ .with_filter(filter_expr)
+ .build()
.await?;
- register_store(self.object_store(), session.runtime_env().clone());
+ Ok(Arc::new(scan))
+ }
- let filter_expr = conjunction(filters.iter().cloned())
- .map(|expr| logical_expr_to_physical_expr(&expr, &schema));
+ fn supports_filter_pushdown(
+ &self,
+ _filter: &Expr,
+ ) -> DataFusionResult {
+ Ok(TableProviderFilterPushDown::Inexact)
+ }
- let actions = if let Some(predicate) = &filter_expr {
- let pruning_predicate = PruningPredicate::try_new(predicate.clone(), schema.clone())?;
- let files_to_prune = pruning_predicate.prune(&self.state)?;
- self.get_state()
- .files()
- .iter()
- .zip(files_to_prune)
- .filter_map(
- |(action, keep)| {
- if keep {
- Some(action.to_owned())
- } else {
- None
- }
- },
- )
- .collect()
- } else {
- self.get_state().files().to_owned()
- };
+ fn statistics(&self) -> Option {
+ Some(self.state.datafusion_table_statistics())
+ }
+}
- let parquet_scan = parquet_scan_from_actions(
- &self.state,
- self.object_store(),
- &actions,
- &schema,
- filter_expr,
- session,
- projection,
- limit,
- )
- .await?;
+/// A Delta table provider that enables additonal metadata columns to be included during the scan
+pub struct DeltaTableProvider {
+ snapshot: DeltaTableState,
+ store: ObjectStoreRef,
+ config: DeltaScanConfig,
+ schema: Arc,
+}
+
+impl DeltaTableProvider {
+ /// Build a DeltaTableProvider
+ pub fn try_new(
+ snapshot: DeltaTableState,
+ store: ObjectStoreRef,
+ config: DeltaScanConfig,
+ ) -> DeltaResult {
+ Ok(DeltaTableProvider {
+ schema: logical_schema(&snapshot, &config)?,
+ snapshot,
+ store,
+ config,
+ })
+ }
+}
- Ok(Arc::new(DeltaScan {
- table_uri: ensure_table_uri(self.table_uri())?.as_str().into(),
- parquet_scan,
- }))
+#[async_trait]
+impl TableProvider for DeltaTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> Arc {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ fn get_table_definition(&self) -> Option<&str> {
+ None
+ }
+
+ fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+ None
+ }
+
+ async fn scan(
+ &self,
+ session: &SessionState,
+ projection: Option<&Vec>,
+ filters: &[Expr],
+ limit: Option,
+ ) -> DataFusionResult> {
+ register_store(self.store.clone(), session.runtime_env().clone());
+ let filter_expr = conjunction(filters.iter().cloned());
+
+ let scan = DeltaScanBuilder::new(&self.snapshot, self.store.clone(), session)
+ .with_projection(projection)
+ .with_limit(limit)
+ .with_filter(filter_expr)
+ .with_scan_config(self.config.clone())
+ .build()
+ .await?;
+
+ Ok(Arc::new(scan))
}
fn supports_filter_pushdown(
@@ -508,7 +787,7 @@ impl TableProvider for DeltaTable {
}
fn statistics(&self) -> Option {
- Some(self.state.datafusion_table_statistics())
+ Some(self.snapshot.datafusion_table_statistics())
}
}
@@ -518,8 +797,19 @@ impl TableProvider for DeltaTable {
pub struct DeltaScan {
/// The URL of the ObjectStore root
pub table_uri: String,
+ /// Column that contains an index that maps to the original metadata Add
+ pub config: DeltaScanConfig,
/// The parquet scan to wrap
pub parquet_scan: Arc,
+ /// The schema of the table to be used when evaluating expressions
+ pub logical_schema: Arc,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct DeltaScanWire {
+ pub table_uri: String,
+ pub config: DeltaScanConfig,
+ pub logical_schema: Arc,
}
impl DisplayAs for DeltaScan {
@@ -631,21 +921,31 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult PartitionedFile {
- let partition_values = schema
- .fields()
+ let partition_values = partition_columns
.iter()
- .filter_map(|f| {
- action.partition_values.get(f.name()).map(|val| match val {
- Some(value) => to_correct_scalar_value(
- &serde_json::Value::String(value.to_string()),
- f.data_type(),
- )
- .unwrap_or(ScalarValue::Null),
- None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null),
- })
+ .map(|part| {
+ action
+ .partition_values
+ .get(part)
+ .map(|val| {
+ schema
+ .field_with_name(part)
+ .map(|field| match val {
+ Some(value) => to_correct_scalar_value(
+ &serde_json::Value::String(value.to_string()),
+ field.data_type(),
+ )
+ .unwrap_or(ScalarValue::Null),
+ None => get_null_of_arrow_type(field.data_type())
+ .unwrap_or(ScalarValue::Null),
+ })
+ .unwrap_or(ScalarValue::Null)
+ })
+ .unwrap_or(ScalarValue::Null)
})
.collect::>();
@@ -920,11 +1220,13 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec {
inputs: &[Arc],
_registry: &dyn FunctionRegistry,
) -> Result, DataFusionError> {
- let table_uri: String = serde_json::from_reader(buf)
+ let wire: DeltaScanWire = serde_json::from_reader(buf)
.map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
let delta_scan = DeltaScan {
- table_uri,
+ table_uri: wire.table_uri,
parquet_scan: (*inputs)[0].clone(),
+ config: wire.config,
+ logical_schema: wire.logical_schema,
};
Ok(Arc::new(delta_scan))
}
@@ -938,7 +1240,13 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec {
.as_any()
.downcast_ref::()
.ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
- serde_json::to_writer(buf, delta_scan.table_uri.as_str())
+
+ let wire = DeltaScanWire {
+ table_uri: delta_scan.table_uri.to_owned(),
+ config: delta_scan.config.clone(),
+ logical_schema: delta_scan.logical_schema.clone(),
+ };
+ serde_json::to_writer(buf, &wire)
.map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
Ok(())
}
@@ -1099,27 +1407,44 @@ pub struct FindFiles {
fn join_batches_with_add_actions(
batches: Vec,
mut actions: HashMap,
+ path_column: &str,
+ dict_array: bool,
) -> DeltaResult> {
// Given RecordBatches that contains `__delta_rs_path` perform a hash join
// with actions to obtain original add actions
let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum());
for batch in batches {
- let array = batch
- .column_by_name(PATH_COLUMN)
- .ok_or_else(|| {
- DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN))
- })?
- .as_any()
- .downcast_ref::()
- .ok_or(DeltaTableError::Generic(format!(
- "Unable to downcast column {}",
- PATH_COLUMN
- )))?;
- for path in array {
+ let array = batch.column_by_name(path_column).ok_or_else(|| {
+ DeltaTableError::Generic(format!("Unable to find column {}", path_column))
+ })?;
+
+ let iter: Box>> =
+ if dict_array {
+ let array = array
+ .as_any()
+ .downcast_ref::>()
+ .ok_or(DeltaTableError::Generic(format!(
+ "Unable to downcast column {}",
+ path_column
+ )))?
+ .downcast_dict::()
+ .ok_or(DeltaTableError::Generic(format!(
+ "Unable to downcast column {}",
+ path_column
+ )))?;
+ Box::new(array.into_iter())
+ } else {
+ let array = array.as_any().downcast_ref::().ok_or(
+ DeltaTableError::Generic(format!("Unable to downcast column {}", path_column)),
+ )?;
+ Box::new(array.into_iter())
+ };
+
+ for path in iter {
let path = path.ok_or(DeltaTableError::Generic(format!(
"{} cannot be null",
- PATH_COLUMN
+ path_column
)))?;
match actions.remove(path) {
@@ -1139,89 +1464,43 @@ fn join_batches_with_add_actions(
pub(crate) async fn find_files_scan<'a>(
snapshot: &DeltaTableState,
store: ObjectStoreRef,
- schema: Arc,
- file_schema: Arc,
- candidates: Vec<&'a Add>,
state: &SessionState,
- expression: &Expr,
+ expression: Expr,
) -> DeltaResult> {
- let mut candidate_map: HashMap = HashMap::new();
-
- let table_partition_cols = snapshot
- .current_metadata()
- .ok_or(DeltaTableError::NoMetadata)?
- .partition_columns
- .clone();
-
- let mut file_groups: HashMap, Vec> = HashMap::new();
- for action in candidates {
- let mut part = partitioned_file_from_action(action, &schema);
- part.partition_values
- .push(ScalarValue::Utf8(Some(action.path.clone())));
-
- file_groups
- .entry(part.partition_values.clone())
- .or_default()
- .push(part);
-
- candidate_map.insert(action.path.to_owned(), action.to_owned());
- }
-
- let mut table_partition_cols = table_partition_cols
+ let candidate_map: HashMap = snapshot
+ .files()
.iter()
- .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone())))
- .collect::, ArrowError>>()?;
- // Append a column called __delta_rs_path to track the file path
- table_partition_cols.push((PATH_COLUMN.to_owned(), DataType::Utf8));
+ .map(|add| (add.path.clone(), add.to_owned()))
+ .collect();
- let input_schema = snapshot.input_schema()?;
-
- let mut fields = Vec::new();
- for field in input_schema.fields.iter() {
- fields.push(field.to_owned());
+ let scan_config = DeltaScanConfigBuilder {
+ include_file_column: true,
+ ..Default::default()
}
- fields.push(Arc::new(Field::new(
- PATH_COLUMN,
- arrow_schema::DataType::Boolean,
- true,
- )));
- let input_schema = Arc::new(ArrowSchema::new(fields));
+ .build(snapshot)?;
+
+ let logical_schema = logical_schema(snapshot, &scan_config)?;
// Identify which columns we need to project
let mut used_columns = expression
.to_columns()?
.into_iter()
- .map(|column| input_schema.index_of(&column.name))
- .collect::, ArrowError>>()
- .unwrap();
+ .map(|column| logical_schema.index_of(&column.name))
+ .collect::, ArrowError>>()?;
// Add path column
- used_columns.push(input_schema.index_of(PATH_COLUMN)?);
+ used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);
- // Project the logical schema so column indicies align between the parquet scan and the expression
- let mut fields = vec![];
- for idx in &used_columns {
- fields.push(input_schema.field(*idx).to_owned());
- }
- let input_schema = Arc::new(ArrowSchema::new(fields));
- let input_dfschema = input_schema.as_ref().clone().try_into()?;
-
- let parquet_scan = ParquetFormat::new()
- .create_physical_plan(
- state,
- FileScanConfig {
- object_store_url: store.object_store_url(),
- file_schema,
- file_groups: file_groups.into_values().collect(),
- statistics: snapshot.datafusion_table_statistics(),
- projection: Some(used_columns),
- limit: None,
- table_partition_cols,
- infinite_source: false,
- output_ordering: vec![],
- },
- None,
- )
+ let scan = DeltaScanBuilder::new(snapshot, store.clone(), state)
+ .with_filter(Some(expression.clone()))
+ .with_projection(Some(&used_columns))
+ .with_scan_config(scan_config)
+ .build()
.await?;
+ let scan = Arc::new(scan);
+
+ let config = &scan.config;
+ let input_schema = scan.logical_schema.as_ref().to_owned();
+ let input_dfschema = input_schema.clone().try_into()?;
let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
@@ -1231,13 +1510,18 @@ pub(crate) async fn find_files_scan<'a>(
)?;
let filter: Arc =
- Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?);
+ Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1));
let task_ctx = Arc::new(TaskContext::from(state));
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;
- join_batches_with_add_actions(path_batches, candidate_map)
+ join_batches_with_add_actions(
+ path_batches,
+ candidate_map,
+ config.file_column_name.as_ref().unwrap(),
+ true,
+ )
}
pub(crate) async fn scan_memory_table(
@@ -1291,42 +1575,22 @@ pub(crate) async fn scan_memory_table(
.map(|action| (action.path.clone(), action))
.collect::>();
- join_batches_with_add_actions(batches, map)
+ join_batches_with_add_actions(batches, map, PATH_COLUMN, false)
}
/// Finds files in a snapshot that match the provided predicate.
pub async fn find_files<'a>(
snapshot: &DeltaTableState,
object_store: ObjectStoreRef,
- schema: Arc,
state: &SessionState,
predicate: Option,
) -> DeltaResult {
let current_metadata = snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?;
- let table_partition_cols = current_metadata.partition_columns.clone();
match &predicate {
Some(predicate) => {
- let file_schema = Arc::new(ArrowSchema::new(
- schema
- .fields()
- .iter()
- .filter(|f| !table_partition_cols.contains(f.name()))
- .cloned()
- .collect::>(),
- ));
-
- let input_schema = snapshot.input_schema()?;
- let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;
- let expr = create_physical_expr(
- predicate,
- &input_dfschema,
- &input_schema,
- state.execution_props(),
- )?;
-
// Validate the Predicate and determine if it only contains partition columns
let mut expr_properties = FindFilesExprProperties {
partition_only: true,
@@ -1344,26 +1608,9 @@ pub async fn find_files<'a>(
partition_scan: true,
})
} else {
- let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?;
- let files_to_prune = pruning_predicate.prune(snapshot)?;
- let files: Vec<&Add> = snapshot
- .files()
- .iter()
- .zip(files_to_prune.into_iter())
- .filter_map(|(action, keep)| if keep { Some(action) } else { None })
- .collect();
-
- // Create a new delta scan plan with only files that have a record
- let candidates = find_files_scan(
- snapshot,
- object_store.clone(),
- schema.clone(),
- file_schema.clone(),
- files,
- state,
- predicate,
- )
- .await?;
+ let candidates =
+ find_files_scan(snapshot, object_store.clone(), state, predicate.to_owned())
+ .await?;
Ok(FindFiles {
candidates,
@@ -1380,9 +1627,11 @@ pub async fn find_files<'a>(
#[cfg(test)]
mod tests {
+ use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{TimeZone, Utc};
+ use datafusion::assert_batches_sorted_eq;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
@@ -1541,7 +1790,7 @@ mod tests {
let mut partition_values = std::collections::HashMap::new();
partition_values.insert("month".to_string(), Some("1".to_string()));
partition_values.insert("year".to_string(), Some("2015".to_string()));
- let action = action::Add {
+ let action = protocol::Add {
path: "year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string(),
size: 10644,
partition_values,
@@ -1558,7 +1807,8 @@ mod tests {
Field::new("month", ArrowDataType::Int64, true),
]);
- let file = partitioned_file_from_action(&action, &schema);
+ let part_columns = vec!["year".to_string(), "month".to_string()];
+ let file = partitioned_file_from_action(&action, &part_columns, &schema);
let ref_file = PartitionedFile {
object_meta: object_store::ObjectMeta {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
@@ -1648,7 +1898,9 @@ mod tests {
]));
let exec_plan = Arc::from(DeltaScan {
table_uri: "s3://my_bucket/this/is/some/path".to_string(),
- parquet_scan: Arc::from(EmptyExec::new(false, schema)),
+ parquet_scan: Arc::from(EmptyExec::new(false, schema.clone())),
+ config: DeltaScanConfig::default(),
+ logical_schema: schema.clone(),
});
let proto: protobuf::PhysicalPlanNode =
protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
@@ -1660,4 +1912,92 @@ mod tests {
.expect("from proto");
assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
}
+
+ #[tokio::test]
+ async fn delta_table_provider_with_config() {
+ let table = crate::open_table("tests/data/delta-2.2.0-partitioned-types")
+ .await
+ .unwrap();
+ let config = DeltaScanConfigBuilder::new()
+ .with_file_column_name(&"file_source")
+ .build(&table.state)
+ .unwrap();
+
+ let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
+ let ctx = SessionContext::new();
+ ctx.register_table("test", Arc::new(provider)).unwrap();
+
+ let df = ctx.sql("select * from test").await.unwrap();
+ let actual = df.collect().await.unwrap();
+ let expected = vec! [
+ "+----+----+----+-------------------------------------------------------------------------------+",
+ "| c3 | c1 | c2 | file_source |",
+ "+----+----+----+-------------------------------------------------------------------------------+",
+ "| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
+ "| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
+ "| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
+ "+----+----+----+-------------------------------------------------------------------------------+",
+ ];
+ assert_batches_sorted_eq!(&expected, &actual);
+ }
+
+ #[tokio::test]
+ async fn delta_scan_mixed_partition_order() {
+ // Tests issue (1787) where partition columns were incorrect when they
+ // have a different order in the metadata and table schema
+ let schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("modified", DataType::Utf8, true),
+ Field::new("id", DataType::Utf8, true),
+ Field::new("value", DataType::Int32, true),
+ ]));
+
+ let table = crate::DeltaOps::new_in_memory()
+ .create()
+ .with_columns(get_delta_schema().get_fields().clone())
+ .with_partition_columns(["modified", "id"])
+ .await
+ .unwrap();
+ assert_eq!(table.version(), 0);
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(arrow::array::StringArray::from(vec![
+ "2021-02-01",
+ "2021-02-01",
+ "2021-02-02",
+ "2021-02-02",
+ ])),
+ Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
+ Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
+ ],
+ )
+ .unwrap();
+ // write some data
+ let table = crate::DeltaOps(table)
+ .write(vec![batch.clone()])
+ .with_save_mode(crate::protocol::SaveMode::Append)
+ .await
+ .unwrap();
+
+ let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();
+
+ let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
+ let ctx = SessionContext::new();
+ ctx.register_table("test", Arc::new(provider)).unwrap();
+
+ let df = ctx.sql("select * from test").await.unwrap();
+ let actual = df.collect().await.unwrap();
+ let expected = vec![
+ "+-------+------------+----+",
+ "| value | modified | id |",
+ "+-------+------------+----+",
+ "| 1 | 2021-02-01 | A |",
+ "| 10 | 2021-02-01 | B |",
+ "| 100 | 2021-02-02 | D |",
+ "| 20 | 2021-02-02 | C |",
+ "+-------+------------+----+",
+ ];
+ assert_batches_sorted_eq!(&expected, &actual);
+ }
}
diff --git a/rust/src/errors.rs b/crates/deltalake-core/src/errors.rs
similarity index 99%
rename from rust/src/errors.rs
rename to crates/deltalake-core/src/errors.rs
index fed0e823f8..24989b2814 100644
--- a/rust/src/errors.rs
+++ b/crates/deltalake-core/src/errors.rs
@@ -1,8 +1,8 @@
//! Exceptions for the deltalake crate
use object_store::Error as ObjectStoreError;
-use crate::action::ProtocolError;
use crate::operations::transaction::TransactionError;
+use crate::protocol::ProtocolError;
/// A result returned by delta-rs
pub type DeltaResult = Result;
diff --git a/rust/src/lib.rs b/crates/deltalake-core/src/lib.rs
similarity index 83%
rename from rust/src/lib.rs
rename to crates/deltalake-core/src/lib.rs
index 2d03651cb9..fa7f65963f 100644
--- a/rust/src/lib.rs
+++ b/crates/deltalake-core/src/lib.rs
@@ -6,7 +6,7 @@
//!
//! ```rust
//! async {
-//! let table = deltalake::open_table("./tests/data/simple_table").await.unwrap();
+//! let table = deltalake_core::open_table("./tests/data/simple_table").await.unwrap();
//! let files = table.get_files();
//! };
//! ```
@@ -15,10 +15,10 @@
//!
//! ```rust
//! async {
-//! let table = deltalake::open_table_with_version("./tests/data/simple_table", 0).await.unwrap();
-//! let files = table.get_files_by_partitions(&[deltalake::PartitionFilter {
-//! key: "month",
-//! value: deltalake::PartitionValue::Equal("12"),
+//! let table = deltalake_core::open_table_with_version("./tests/data/simple_table", 0).await.unwrap();
+//! let files = table.get_files_by_partitions(&[deltalake_core::PartitionFilter {
+//! key: "month".to_string(),
+//! value: deltalake_core::PartitionValue::Equal("12".to_string()),
//! }]);
//! };
//! ```
@@ -27,7 +27,7 @@
//!
//! ```rust
//! async {
-//! let table = deltalake::open_table_with_ds(
+//! let table = deltalake_core::open_table_with_ds(
//! "./tests/data/simple_table",
//! "2020-05-02T23:47:31-07:00",
//! ).await.unwrap();
@@ -56,7 +56,7 @@
//!
//! async {
//! let mut ctx = SessionContext::new();
-//! let table = deltalake::open_table("./tests/data/simple_table")
+//! let table = deltalake_core::open_table("./tests/data/simple_table")
//! .await
//! .unwrap();
//! ctx.register_table("demo", Arc::new(table)).unwrap();
@@ -82,42 +82,34 @@ compile_error!(
"Features s3 and s3-native-tls are mutually exclusive and cannot be enabled together"
);
-pub mod action;
-pub mod builder;
pub mod data_catalog;
-pub mod delta;
-pub mod delta_config;
pub mod errors;
pub mod operations;
-pub mod partitions;
+pub mod protocol;
pub mod schema;
pub mod storage;
-pub mod table_state;
-pub mod time_utils;
+pub mod table;
-#[cfg(feature = "arrow")]
-pub mod table_state_arrow;
-
-#[cfg(all(feature = "arrow", feature = "parquet"))]
-pub mod delta_arrow;
#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;
-pub use self::builder::*;
+use std::collections::HashMap;
+
pub use self::data_catalog::{get_data_catalog, DataCatalog, DataCatalogError};
-pub use self::delta::*;
-pub use self::delta_config::*;
-pub use self::partitions::*;
+pub use self::errors::*;
+pub use self::schema::partitions::*;
pub use self::schema::*;
-pub use errors::*;
+pub use self::table::builder::{
+ DeltaTableBuilder, DeltaTableConfig, DeltaTableLoadOptions, DeltaVersion,
+};
+pub use self::table::config::DeltaConfigKey;
+pub use self::table::DeltaTable;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
pub use operations::DeltaOps;
// convenience exports for consumers to avoid aligning crate versions
-#[cfg(all(feature = "arrow", feature = "parquet"))]
-pub use action::checkpoints;
#[cfg(feature = "arrow")]
pub use arrow;
#[cfg(feature = "datafusion")]
@@ -126,15 +118,70 @@ pub use datafusion;
pub use parquet;
#[cfg(feature = "parquet2")]
pub use parquet2;
+#[cfg(all(feature = "arrow", feature = "parquet"))]
+pub use protocol::checkpoints;
// needed only for integration tests
// TODO can / should we move this into the test crate?
#[cfg(feature = "integration_test")]
pub mod test_utils;
+/// Creates and loads a DeltaTable from the given path with current metadata.
+/// Infers the storage backend to use from the scheme in the given table path.
+pub async fn open_table(table_uri: impl AsRef) -> Result {
+ let table = DeltaTableBuilder::from_uri(table_uri).load().await?;
+ Ok(table)
+}
+
+/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced
+/// `StorageService`.
+pub async fn open_table_with_storage_options(
+ table_uri: impl AsRef,
+ storage_options: HashMap,
+) -> Result {
+ let table = DeltaTableBuilder::from_uri(table_uri)
+ .with_storage_options(storage_options)
+ .load()
+ .await?;
+ Ok(table)
+}
+
+/// Creates a DeltaTable from the given path and loads it with the metadata from the given version.
+/// Infers the storage backend to use from the scheme in the given table path.
+pub async fn open_table_with_version(
+ table_uri: impl AsRef,
+ version: i64,
+) -> Result {
+ let table = DeltaTableBuilder::from_uri(table_uri)
+ .with_version(version)
+ .load()
+ .await?;
+ Ok(table)
+}
+
+/// Creates a DeltaTable from the given path.
+/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp.
+/// Infers the storage backend to use from the scheme in the given table path.
+pub async fn open_table_with_ds(
+ table_uri: impl AsRef,
+ ds: impl AsRef,
+) -> Result {
+ let table = DeltaTableBuilder::from_uri(table_uri)
+ .with_datestring(ds)?
+ .load()
+ .await?;
+ Ok(table)
+}
+
+/// Returns rust crate version, can be use used in language bindings to expose Rust core version
+pub fn crate_version() -> &'static str {
+ env!("CARGO_PKG_VERSION")
+}
+
#[cfg(test)]
mod tests {
use super::*;
+ use crate::table::PeekCommit;
use std::collections::HashMap;
#[tokio::test]
@@ -153,7 +200,7 @@ mod tests {
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 4);
- assert!(tombstones.contains(&crate::action::Remove {
+ assert!(tombstones.contains(&crate::protocol::Remove {
path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1564524298213),
data_change: false,
@@ -255,7 +302,7 @@ mod tests {
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 1);
- assert!(tombstones.contains(&crate::action::Remove {
+ assert!(tombstones.contains(&crate::protocol::Remove {
path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1615043776198),
data_change: true,
@@ -301,12 +348,12 @@ mod tests {
let filters = vec![
crate::PartitionFilter {
- key: "month",
- value: crate::PartitionValue::Equal("2"),
+ key: "month".to_string(),
+ value: crate::PartitionValue::Equal("2".to_string()),
},
crate::PartitionFilter {
- key: "year",
- value: crate::PartitionValue::Equal("2020"),
+ key: "year".to_string(),
+ value: crate::PartitionValue::Equal("2020".to_string()),
},
];
@@ -336,8 +383,8 @@ mod tests {
);
let filters = vec![crate::PartitionFilter {
- key: "month",
- value: crate::PartitionValue::NotEqual("2"),
+ key: "month".to_string(),
+ value: crate::PartitionValue::NotEqual("2".to_string()),
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
@@ -350,8 +397,8 @@ mod tests {
);
let filters = vec![crate::PartitionFilter {
- key: "month",
- value: crate::PartitionValue::In(vec!["2", "12"]),
+ key: "month".to_string(),
+ value: crate::PartitionValue::In(vec!["2".to_string(), "12".to_string()]),
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
@@ -364,8 +411,8 @@ mod tests {
);
let filters = vec![crate::PartitionFilter {
- key: "month",
- value: crate::PartitionValue::NotIn(vec!["2", "12"]),
+ key: "month".to_string(),
+ value: crate::PartitionValue::NotIn(vec!["2".to_string(), "12".to_string()]),
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
@@ -383,8 +430,8 @@ mod tests {
.unwrap();
let filters = vec![crate::PartitionFilter {
- key: "k",
- value: crate::PartitionValue::Equal("A"),
+ key: "k".to_string(),
+ value: crate::PartitionValue::Equal("A".to_string()),
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
@@ -394,8 +441,8 @@ mod tests {
);
let filters = vec![crate::PartitionFilter {
- key: "k",
- value: crate::PartitionValue::Equal(""),
+ key: "k".to_string(),
+ value: crate::PartitionValue::Equal("".to_string()),
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
@@ -426,8 +473,8 @@ mod tests {
);
let filters = vec![crate::PartitionFilter {
- key: "x",
- value: crate::PartitionValue::Equal("A/A"),
+ key: "x".to_string(),
+ value: crate::PartitionValue::Equal("A/A".to_string()),
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
@@ -445,8 +492,8 @@ mod tests {
.unwrap();
let filters = vec![crate::PartitionFilter {
- key: "x",
- value: crate::PartitionValue::LessThanOrEqual("9"),
+ key: "x".to_string(),
+ value: crate::PartitionValue::LessThanOrEqual("9".to_string()),
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
@@ -456,8 +503,8 @@ mod tests {
);
let filters = vec![crate::PartitionFilter {
- key: "y",
- value: crate::PartitionValue::LessThan("10.0"),
+ key: "y".to_string(),
+ value: crate::PartitionValue::LessThan("10.0".to_string()),
}];
assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
@@ -475,7 +522,10 @@ mod tests {
let table_from_struct_stats = crate::open_table(table_uri).await.unwrap();
let table_from_json_stats = crate::open_table_with_version(table_uri, 1).await.unwrap();
- fn get_stats_for_file(table: &crate::DeltaTable, file_name: &str) -> crate::action::Stats {
+ fn get_stats_for_file(
+ table: &crate::DeltaTable,
+ file_name: &str,
+ ) -> crate::protocol::Stats {
table
.get_file_uris()
.zip(table.get_stats())
diff --git a/rust/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs
similarity index 92%
rename from rust/src/operations/create.rs
rename to crates/deltalake-core/src/operations/create.rs
index 697ab3ef1d..8a78f2266b 100644
--- a/rust/src/operations/create.rs
+++ b/crates/deltalake-core/src/operations/create.rs
@@ -9,13 +9,14 @@ use serde_json::{Map, Value};
use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
-use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
-use crate::builder::ensure_table_uri;
-use crate::delta_config::DeltaConfigKey;
use crate::errors::{DeltaResult, DeltaTableError};
+use crate::protocol::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct};
use crate::storage::DeltaObjectStore;
-use crate::{DeltaTable, DeltaTableBuilder, DeltaTableMetaData};
+use crate::table::builder::ensure_table_uri;
+use crate::table::config::DeltaConfigKey;
+use crate::table::DeltaTableMetaData;
+use crate::{DeltaTable, DeltaTableBuilder};
#[derive(thiserror::Error, Debug)]
enum CreateError {
@@ -148,7 +149,7 @@ impl CreateBuilder {
///
/// Options may be passed in the HashMap or set as environment variables.
///
- /// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
+ /// [crate::table::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client.
///
/// If an object store is also passed using `with_object_store()` these options will be ignored.
@@ -249,6 +250,8 @@ impl CreateBuilder {
.unwrap_or_else(|| Protocol {
min_reader_version: MAX_SUPPORTED_READER_VERSION,
min_writer_version: MAX_SUPPORTED_WRITER_VERSION,
+ writer_features: None,
+ reader_features: None,
});
let metadata = DeltaTableMetaData::new(
@@ -291,7 +294,7 @@ impl std::future::IntoFuture for CreateBuilder {
Box::pin(async move {
let mode = this.mode.clone();
let (mut table, actions, operation) = this.into_table_and_actions()?;
- if table.object_store().is_delta_table_location().await? {
+ let table_state = if table.object_store().is_delta_table_location().await? {
match mode {
SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()),
SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()),
@@ -300,15 +303,19 @@ impl std::future::IntoFuture for CreateBuilder {
return Ok(table);
}
SaveMode::Overwrite => {
- todo!("Overwriting on create not yet implemented. Use 'write' operation instead.")
+ table.load().await?;
+ &table.state
}
}
- }
+ } else {
+ &table.state
+ };
+
let version = commit(
table.object_store().as_ref(),
&actions,
operation,
- &table.state,
+ table_state,
None,
)
.await?;
@@ -322,8 +329,8 @@ impl std::future::IntoFuture for CreateBuilder {
#[cfg(all(test, feature = "parquet"))]
mod tests {
use super::*;
- use crate::delta_config::DeltaConfigKey;
use crate::operations::DeltaOps;
+ use crate::table::config::DeltaConfigKey;
use crate::writer::test_utils::get_delta_schema;
use tempdir::TempDir;
@@ -394,6 +401,8 @@ mod tests {
let protocol = Protocol {
min_reader_version: 0,
min_writer_version: 0,
+ writer_features: None,
+ reader_features: None,
};
let table = CreateBuilder::new()
.with_location("memory://")
@@ -455,12 +464,12 @@ mod tests {
assert_eq!(table.get_metadata().unwrap().id, first_id);
// Check table is overwritten
- // let table = CreateBuilder::new()
- // .with_object_store(object_store.clone())
- // .with_columns(schema.get_fields().clone())
- // .with_save_mode(SaveMode::Overwrite)
- // .await
- // .unwrap();
- // assert_ne!(table.get_metadata().unwrap().id, first_id)
+ let table = CreateBuilder::new()
+ .with_object_store(object_store.clone())
+ .with_columns(schema.get_fields().clone())
+ .with_save_mode(SaveMode::Overwrite)
+ .await
+ .unwrap();
+ assert_ne!(table.get_metadata().unwrap().id, first_id)
}
}
diff --git a/rust/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs
similarity index 91%
rename from rust/src/operations/delete.rs
rename to crates/deltalake-core/src/operations/delete.rs
index 39b98cce24..913658f279 100644
--- a/rust/src/operations/delete.rs
+++ b/crates/deltalake-core/src/operations/delete.rs
@@ -20,7 +20,8 @@
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
-use crate::action::{Action, Add, Remove};
+use crate::delta_datafusion::expr::fmt_expr_to_sql;
+use crate::protocol::{Action, Add, Remove};
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::filter::FilterExec;
@@ -30,16 +31,17 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::DFSchema;
use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
+use serde::Serialize;
use serde_json::Map;
use serde_json::Value;
-use crate::action::DeltaOperation;
-use crate::delta_datafusion::{find_files, parquet_scan_from_actions, register_store};
+use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::operations::transaction::commit;
use crate::operations::write::write_execution_plan;
+use crate::protocol::DeltaOperation;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};
-use crate::table_state::DeltaTableState;
+use crate::table::state::DeltaTableState;
use crate::DeltaTable;
use super::datafusion_utils::Expression;
@@ -61,7 +63,7 @@ pub struct DeleteBuilder {
app_metadata: Option