diff --git a/.github/actions/setup-rust-runtime/action.yaml b/.github/actions/setup-rust-runtime/action.yaml
index 90e09a957cd4..27cdf9b97419 100644
--- a/.github/actions/setup-rust-runtime/action.yaml
+++ b/.github/actions/setup-rust-runtime/action.yaml
@@ -37,5 +37,5 @@ runs:
echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV
echo "RUST_BACKTRACE=1" >> $GITHUB_ENV
echo "RUST_MIN_STACK=3000000" >> $GITHUB_ENV
- echo "RUST_FLAGS=-C debuginfo=line-tables-only -C incremental=false" >> $GITHUB_ENV
+ echo "RUSTFLAGS=-C debuginfo=line-tables-only -C incremental=false" >> $GITHUB_ENV
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 375c9f2c2c5a..d384e4bc7ebf 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -488,7 +488,7 @@ jobs:
# Verify MSRV for the crates which are directly used by other projects.
msrv:
- name: Verify MSRV
+ name: Verify MSRV (Min Supported Rust Version)
runs-on: ubuntu-latest
container:
image: amd64/rust
@@ -500,7 +500,13 @@ jobs:
run: cargo install cargo-msrv
- name: Check datafusion
working-directory: datafusion/core
- run: cargo msrv verify
+ run: |
+ # If you encounter an error with any of the commands below
+ # it means some crate in your dependency tree has a higher
+ # MSRV (Min Supported Rust Version) than the one specified
+ # in the `rust-version` key of `Cargo.toml`. Check your
+ # dependencies or update the version in `Cargo.toml`
+ cargo msrv verify
- name: Check datafusion-substrait
working-directory: datafusion/substrait
run: cargo msrv verify
diff --git a/Cargo.toml b/Cargo.toml
index cc1861677476..3b1362d22426 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,7 +29,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.70"
-version = "34.0.0"
+version = "35.0.0"
[workspace.dependencies]
arrow = { version = "50.0.0", features = ["prettyprint"] }
@@ -45,17 +45,17 @@ bytes = "1.4"
chrono = { version = "0.4.31", default-features = false }
ctor = "0.2.0"
dashmap = "5.4.0"
-datafusion = { path = "datafusion/core", version = "34.0.0" }
-datafusion-common = { path = "datafusion/common", version = "34.0.0" }
-datafusion-execution = { path = "datafusion/execution", version = "34.0.0" }
-datafusion-expr = { path = "datafusion/expr", version = "34.0.0" }
-datafusion-optimizer = { path = "datafusion/optimizer", version = "34.0.0" }
-datafusion-physical-expr = { path = "datafusion/physical-expr", version = "34.0.0" }
-datafusion-physical-plan = { path = "datafusion/physical-plan", version = "34.0.0" }
-datafusion-proto = { path = "datafusion/proto", version = "34.0.0" }
-datafusion-sql = { path = "datafusion/sql", version = "34.0.0" }
-datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "34.0.0" }
-datafusion-substrait = { path = "datafusion/substrait", version = "34.0.0" }
+datafusion = { path = "datafusion/core", version = "35.0.0" }
+datafusion-common = { path = "datafusion/common", version = "35.0.0" }
+datafusion-execution = { path = "datafusion/execution", version = "35.0.0" }
+datafusion-expr = { path = "datafusion/expr", version = "35.0.0" }
+datafusion-optimizer = { path = "datafusion/optimizer", version = "35.0.0" }
+datafusion-physical-expr = { path = "datafusion/physical-expr", version = "35.0.0" }
+datafusion-physical-plan = { path = "datafusion/physical-plan", version = "35.0.0" }
+datafusion-proto = { path = "datafusion/proto", version = "35.0.0" }
+datafusion-sql = { path = "datafusion/sql", version = "35.0.0" }
+datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "35.0.0" }
+datafusion-substrait = { path = "datafusion/substrait", version = "35.0.0" }
doc-comment = "0.3"
env_logger = "0.10"
futures = "0.3"
@@ -70,7 +70,7 @@ parquet = { version = "50.0.0", default-features = false, features = ["arrow", "
rand = "0.8"
rstest = "0.18.0"
serde_json = "1"
-sqlparser = { version = "0.41.0", features = ["visitor"] }
+sqlparser = { version = "0.43.0", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
url = "2.2"
diff --git a/README.md b/README.md
index 81ae30ab6897..cb89aff4aec7 100644
--- a/README.md
+++ b/README.md
@@ -19,6 +19,25 @@
# DataFusion
+[![Crates.io][crates-badge]][crates-url]
+[![Apache licensed][license-badge]][license-url]
+[![Build Status][actions-badge]][actions-url]
+[![Discord chat][discord-badge]][discord-url]
+
+[crates-badge]: https://img.shields.io/crates/v/datafusion.svg
+[crates-url]: https://crates.io/crates/datafusion
+[license-badge]: https://img.shields.io/badge/license-Apache%20v2-blue.svg
+[license-url]: https://github.com/apache/arrow-datafusion/blob/main/LICENSE.txt
+[actions-badge]: https://github.com/apache/arrow-datafusion/actions/workflows/rust.yml/badge.svg
+[actions-url]: https://github.com/apache/arrow-datafusion/actions?query=branch%3Amain
+[discord-badge]: https://img.shields.io/discord/885562378132000778.svg?logo=discord&style=flat-square
+[discord-url]: https://discord.com/invite/Qw5gKqHxUM
+
+[Website](https://github.com/apache/arrow-datafusion) |
+[Guides](https://github.com/apache/arrow-datafusion/tree/main/docs) |
+[API Docs](https://docs.rs/datafusion/latest/datafusion/) |
+[Chat](https://discord.com/channels/885562378132000778/885562378132000781)
+
DataFusion is a very fast, extensible query engine for building high-quality data-centric systems in
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 94c1ebe7ee47..50b79b4b0661 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -18,7 +18,7 @@
[package]
name = "datafusion-benchmarks"
description = "DataFusion Benchmarks"
-version = "34.0.0"
+version = "35.0.0"
edition = { workspace = true }
authors = ["Apache Arrow "]
homepage = "https://github.com/apache/arrow-datafusion"
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
arrow = { workspace = true }
-datafusion = { path = "../datafusion/core", version = "34.0.0" }
-datafusion-common = { path = "../datafusion/common", version = "34.0.0" }
+datafusion = { path = "../datafusion/core", version = "35.0.0" }
+datafusion-common = { path = "../datafusion/common", version = "35.0.0" }
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
@@ -49,4 +49,4 @@ test-utils = { path = "../test-utils/", version = "0.1.0" }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
[dev-dependencies]
-datafusion-proto = { path = "../datafusion/proto", version = "34.0.0" }
+datafusion-proto = { path = "../datafusion/proto", version = "35.0.0" }
diff --git a/benchmarks/queries/clickbench/README.md b/benchmarks/queries/clickbench/README.md
index d5105afd4832..e03b7d519d91 100644
--- a/benchmarks/queries/clickbench/README.md
+++ b/benchmarks/queries/clickbench/README.md
@@ -11,23 +11,180 @@ ClickBench is focused on aggregation and filtering performance (though it has no
[ClickBench repository]: https://github.com/ClickHouse/ClickBench/blob/main/datafusion/queries.sql
## "Extended" Queries
-The "extended" queries are not part of the official ClickBench benchmark.
-Instead they are used to test other DataFusion features that are not
-covered by the standard benchmark
-Each description below is for the corresponding line in `extended.sql` (line 1
-is `Q0`, line 2 is `Q1`, etc.)
+The "extended" queries are not part of the official ClickBench benchmark.
+Instead they are used to test other DataFusion features that are not covered by
+the standard benchmark Each description below is for the corresponding line in
+`extended.sql` (line 1 is `Q0`, line 2 is `Q1`, etc.)
+
+### Q0: Data Exploration
+
+**Question**: "How many distinct searches, mobile phones, and mobile phone models are there in the dataset?"
+
+**Important Query Properties**: multiple `COUNT DISTINCT`s, with low and high cardinality
+distinct string columns.
+
+```sql
+SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel")
+FROM hits;
+```
+
+
+### Q1: Data Exploration
+
+**Question**: "How many distinct "hit color", "browser country" and "language" are there in the dataset?"
+
+**Important Query Properties**: multiple `COUNT DISTINCT`s. All three are small strings (length either 1 or 2).
-### Q0
-Models initial Data exploration, to understand some statistics of data.
-Import Query Properties: multiple `COUNT DISTINCT` on strings
```sql
-SELECT
- COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel")
+SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage")
FROM hits;
```
+### Q2: Top 10 anaylsis
+**Question**: "Find the top 10 "browser country" by number of distinct "social network"s,
+including the distinct counts of "hit color", "browser language",
+and "social action"."
+**Important Query Properties**: GROUP BY short, string, multiple `COUNT DISTINCT`s. There are several small strings (length either 1 or 2).
+```sql
+SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction")
+FROM hits
+GROUP BY 1
+ORDER BY 2 DESC
+LIMIT 10;
+```
+
+
+## Data Notes
+
+Here are some interesting statistics about the data used in the queries
+Max length of `"SearchPhrase"` is 1113 characters
+```sql
+❯ select min(length("SearchPhrase")) as "SearchPhrase_len_min", max(length("SearchPhrase")) "SearchPhrase_len_max" from 'hits.parquet' limit 10;
++----------------------+----------------------+
+| SearchPhrase_len_min | SearchPhrase_len_max |
++----------------------+----------------------+
+| 0 | 1113 |
++----------------------+----------------------+
+```
+
+
+Here is the schema of the data
+```sql
+❯ describe 'hits.parquet';
++-----------------------+-----------+-------------+
+| column_name | data_type | is_nullable |
++-----------------------+-----------+-------------+
+| WatchID | Int64 | NO |
+| JavaEnable | Int16 | NO |
+| Title | Utf8 | NO |
+| GoodEvent | Int16 | NO |
+| EventTime | Int64 | NO |
+| EventDate | UInt16 | NO |
+| CounterID | Int32 | NO |
+| ClientIP | Int32 | NO |
+| RegionID | Int32 | NO |
+| UserID | Int64 | NO |
+| CounterClass | Int16 | NO |
+| OS | Int16 | NO |
+| UserAgent | Int16 | NO |
+| URL | Utf8 | NO |
+| Referer | Utf8 | NO |
+| IsRefresh | Int16 | NO |
+| RefererCategoryID | Int16 | NO |
+| RefererRegionID | Int32 | NO |
+| URLCategoryID | Int16 | NO |
+| URLRegionID | Int32 | NO |
+| ResolutionWidth | Int16 | NO |
+| ResolutionHeight | Int16 | NO |
+| ResolutionDepth | Int16 | NO |
+| FlashMajor | Int16 | NO |
+| FlashMinor | Int16 | NO |
+| FlashMinor2 | Utf8 | NO |
+| NetMajor | Int16 | NO |
+| NetMinor | Int16 | NO |
+| UserAgentMajor | Int16 | NO |
+| UserAgentMinor | Utf8 | NO |
+| CookieEnable | Int16 | NO |
+| JavascriptEnable | Int16 | NO |
+| IsMobile | Int16 | NO |
+| MobilePhone | Int16 | NO |
+| MobilePhoneModel | Utf8 | NO |
+| Params | Utf8 | NO |
+| IPNetworkID | Int32 | NO |
+| TraficSourceID | Int16 | NO |
+| SearchEngineID | Int16 | NO |
+| SearchPhrase | Utf8 | NO |
+| AdvEngineID | Int16 | NO |
+| IsArtifical | Int16 | NO |
+| WindowClientWidth | Int16 | NO |
+| WindowClientHeight | Int16 | NO |
+| ClientTimeZone | Int16 | NO |
+| ClientEventTime | Int64 | NO |
+| SilverlightVersion1 | Int16 | NO |
+| SilverlightVersion2 | Int16 | NO |
+| SilverlightVersion3 | Int32 | NO |
+| SilverlightVersion4 | Int16 | NO |
+| PageCharset | Utf8 | NO |
+| CodeVersion | Int32 | NO |
+| IsLink | Int16 | NO |
+| IsDownload | Int16 | NO |
+| IsNotBounce | Int16 | NO |
+| FUniqID | Int64 | NO |
+| OriginalURL | Utf8 | NO |
+| HID | Int32 | NO |
+| IsOldCounter | Int16 | NO |
+| IsEvent | Int16 | NO |
+| IsParameter | Int16 | NO |
+| DontCountHits | Int16 | NO |
+| WithHash | Int16 | NO |
+| HitColor | Utf8 | NO |
+| LocalEventTime | Int64 | NO |
+| Age | Int16 | NO |
+| Sex | Int16 | NO |
+| Income | Int16 | NO |
+| Interests | Int16 | NO |
+| Robotness | Int16 | NO |
+| RemoteIP | Int32 | NO |
+| WindowName | Int32 | NO |
+| OpenerName | Int32 | NO |
+| HistoryLength | Int16 | NO |
+| BrowserLanguage | Utf8 | NO |
+| BrowserCountry | Utf8 | NO |
+| SocialNetwork | Utf8 | NO |
+| SocialAction | Utf8 | NO |
+| HTTPError | Int16 | NO |
+| SendTiming | Int32 | NO |
+| DNSTiming | Int32 | NO |
+| ConnectTiming | Int32 | NO |
+| ResponseStartTiming | Int32 | NO |
+| ResponseEndTiming | Int32 | NO |
+| FetchTiming | Int32 | NO |
+| SocialSourceNetworkID | Int16 | NO |
+| SocialSourcePage | Utf8 | NO |
+| ParamPrice | Int64 | NO |
+| ParamOrderID | Utf8 | NO |
+| ParamCurrency | Utf8 | NO |
+| ParamCurrencyID | Int16 | NO |
+| OpenstatServiceName | Utf8 | NO |
+| OpenstatCampaignID | Utf8 | NO |
+| OpenstatAdID | Utf8 | NO |
+| OpenstatSourceID | Utf8 | NO |
+| UTMSource | Utf8 | NO |
+| UTMMedium | Utf8 | NO |
+| UTMCampaign | Utf8 | NO |
+| UTMContent | Utf8 | NO |
+| UTMTerm | Utf8 | NO |
+| FromTag | Utf8 | NO |
+| HasGCLID | Int16 | NO |
+| RefererHash | Int64 | NO |
+| URLHash | Int64 | NO |
+| CLID | Int32 | NO |
++-----------------------+-----------+-------------+
+105 rows in set. Query took 0.034 seconds.
+
+```
diff --git a/benchmarks/queries/clickbench/extended.sql b/benchmarks/queries/clickbench/extended.sql
index 82c0266af61a..0a2999fceb49 100644
--- a/benchmarks/queries/clickbench/extended.sql
+++ b/benchmarks/queries/clickbench/extended.sql
@@ -1 +1,3 @@
-SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel") FROM hits;
\ No newline at end of file
+SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DISTINCT "MobilePhoneModel") FROM hits;
+SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage") FROM hits;
+SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
\ No newline at end of file
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 5663e736dbd8..a718f7591a45 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -360,9 +360,9 @@ dependencies = [
[[package]]
name = "async-compression"
-version = "0.4.5"
+version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
+checksum = "a116f46a969224200a0a97f29cfd4c50e7534e4b4826bd23ea2c3c533039c82c"
dependencies = [
"bzip2",
"flate2",
@@ -733,9 +733,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
-version = "2.4.1"
+version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07"
+checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
[[package]]
name = "blake2"
@@ -867,15 +867,15 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
-version = "0.4.31"
+version = "0.4.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
+checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
dependencies = [
"android-tzdata",
"iana-time-zone",
"num-traits",
"serde",
- "windows-targets 0.48.5",
+ "windows-targets 0.52.0",
]
[[package]]
@@ -1098,7 +1098,7 @@ dependencies = [
[[package]]
name = "datafusion"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"ahash",
"apache-avro",
@@ -1125,7 +1125,7 @@ dependencies = [
"half",
"hashbrown 0.14.3",
"indexmap 2.1.0",
- "itertools 0.12.0",
+ "itertools",
"log",
"num-traits",
"num_cpus",
@@ -1146,7 +1146,7 @@ dependencies = [
[[package]]
name = "datafusion-cli"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"arrow",
"assert_cmd",
@@ -1174,7 +1174,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"ahash",
"apache-avro",
@@ -1193,7 +1193,7 @@ dependencies = [
[[package]]
name = "datafusion-execution"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"arrow",
"chrono",
@@ -1212,7 +1212,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"ahash",
"arrow",
@@ -1226,7 +1226,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"arrow",
"async-trait",
@@ -1235,14 +1235,14 @@ dependencies = [
"datafusion-expr",
"datafusion-physical-expr",
"hashbrown 0.14.3",
- "itertools 0.12.0",
+ "itertools",
"log",
"regex-syntax",
]
[[package]]
name = "datafusion-physical-expr"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"ahash",
"arrow",
@@ -1260,7 +1260,7 @@ dependencies = [
"hashbrown 0.14.3",
"hex",
"indexmap 2.1.0",
- "itertools 0.12.0",
+ "itertools",
"log",
"md-5",
"paste",
@@ -1274,7 +1274,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"ahash",
"arrow",
@@ -1291,7 +1291,7 @@ dependencies = [
"half",
"hashbrown 0.14.3",
"indexmap 2.1.0",
- "itertools 0.12.0",
+ "itertools",
"log",
"once_cell",
"parking_lot",
@@ -1303,7 +1303,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
-version = "34.0.0"
+version = "35.0.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -1652,9 +1652,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "h2"
-version = "0.3.23"
+version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b553656127a00601c8ae5590fcfdc118e4083a7924b6cf4ffc1ea4b99dc429d7"
+checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9"
dependencies = [
"bytes",
"fnv",
@@ -1722,9 +1722,9 @@ dependencies = [
[[package]]
name = "hermit-abi"
-version = "0.3.3"
+version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
+checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f"
[[package]]
name = "hex"
@@ -1908,15 +1908,6 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
-[[package]]
-name = "itertools"
-version = "0.11.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
-dependencies = [
- "either",
-]
-
[[package]]
name = "itertools"
version = "0.12.0"
@@ -2072,16 +2063,16 @@ version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8"
dependencies = [
- "bitflags 2.4.1",
+ "bitflags 2.4.2",
"libc",
"redox_syscall",
]
[[package]]
name = "linux-raw-sys"
-version = "0.4.12"
+version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456"
+checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
[[package]]
name = "lock_api"
@@ -2279,7 +2270,7 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
- "hermit-abi 0.3.3",
+ "hermit-abi 0.3.4",
"libc",
]
@@ -2305,7 +2296,7 @@ dependencies = [
"futures",
"humantime",
"hyper",
- "itertools 0.12.0",
+ "itertools",
"parking_lot",
"percent-encoding",
"quick-xml",
@@ -2484,18 +2475,18 @@ dependencies = [
[[package]]
name = "pin-project"
-version = "1.1.3"
+version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
+checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
-version = "1.1.3"
+version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
+checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690"
dependencies = [
"proc-macro2",
"quote",
@@ -2516,9 +2507,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
-version = "0.3.28"
+version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a"
+checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb"
[[package]]
name = "powerfmt"
@@ -2534,14 +2525,13 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "predicates"
-version = "3.0.4"
+version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0"
+checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8"
dependencies = [
"anstyle",
"difflib",
"float-cmp",
- "itertools 0.11.0",
"normalize-line-endings",
"predicates-core",
"regex",
@@ -2589,9 +2579,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.76"
+version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c"
+checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae"
dependencies = [
"unicode-ident",
]
@@ -2683,9 +2673,9 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.10.2"
+version = "1.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
+checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
dependencies = [
"aho-corasick",
"memchr",
@@ -2695,9 +2685,9 @@ dependencies = [
[[package]]
name = "regex-automata"
-version = "0.4.3"
+version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
+checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd"
dependencies = [
"aho-corasick",
"memchr",
@@ -2836,11 +2826,11 @@ dependencies = [
[[package]]
name = "rustix"
-version = "0.38.29"
+version = "0.38.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a1a81a2478639a14e68937903356dbac62cf52171148924f754bb8a8cd7a96c"
+checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca"
dependencies = [
- "bitflags 2.4.1",
+ "bitflags 2.4.2",
"errno",
"libc",
"linux-raw-sys",
@@ -3102,9 +3092,9 @@ dependencies = [
[[package]]
name = "smallvec"
-version = "1.11.2"
+version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970"
+checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
[[package]]
name = "snafu"
@@ -3158,9 +3148,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "sqlparser"
-version = "0.41.0"
+version = "0.43.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964"
+checksum = "a748c164141797ef0a712aaf16aa71df6f23e80ffea446daa2dd30e3325f89f3"
dependencies = [
"log",
"sqlparser_derive",
@@ -3563,9 +3553,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicode-bidi"
-version = "0.3.14"
+version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416"
+checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75"
[[package]]
name = "unicode-ident"
@@ -3631,9 +3621,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
-version = "1.6.1"
+version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
+checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a"
dependencies = [
"getrandom",
"serde",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index d084938030b1..07ee65e3f6cd 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -18,7 +18,7 @@
[package]
name = "datafusion-cli"
description = "Command Line Client for DataFusion query engine."
-version = "34.0.0"
+version = "35.0.0"
authors = ["Apache Arrow "]
edition = "2021"
keywords = ["arrow", "datafusion", "query", "sql"]
@@ -34,7 +34,7 @@ async-trait = "0.1.41"
aws-config = "0.55"
aws-credential-types = "0.55"
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { path = "../datafusion/core", version = "34.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
+datafusion = { path = "../datafusion/core", version = "35.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
datafusion-common = { path = "../datafusion/common" }
dirs = "4.0.0"
env_logger = "0.9"
diff --git a/datafusion-cli/README.md b/datafusion-cli/README.md
index 1d99cfbcb00a..0afcd489f725 100644
--- a/datafusion-cli/README.md
+++ b/datafusion-cli/README.md
@@ -25,4 +25,22 @@
The DataFusion CLI is a command line utility that runs SQL queries using the DataFusion engine.
-See the [`datafusion-cli` documentation](https://arrow.apache.org/datafusion/user-guide/cli.html) for further information.
+# Frequently Asked Questions
+
+## Where can I find more information?
+
+Answer: See the [`datafusion-cli` documentation](https://arrow.apache.org/datafusion/user-guide/cli.html) for further information.
+
+## How do I make my IDE work with `datafusion-cli`?
+
+Answer: "open" the `datafusion/datafusion-cli` project as its own top level
+project in my IDE (rather than opening `datafusion`)
+
+The reason `datafusion-cli` is not listed as part of the workspace in the main
+[`datafusion Cargo.toml`] file is that `datafusion-cli` is a binary and has a
+checked in `Cargo.lock` file to ensure reproducible builds.
+
+However, the `datafusion` and sub crates are intended for use as libraries and
+thus do not have a `Cargo.lock` file checked in.
+
+[`datafusion cargo.toml`]: https://github.com/apache/arrow-datafusion/blob/main/Cargo.toml
diff --git a/datafusion-cli/src/helper.rs b/datafusion-cli/src/helper.rs
index 69d412db5afa..0e146d575718 100644
--- a/datafusion-cli/src/helper.rs
+++ b/datafusion-cli/src/helper.rs
@@ -18,6 +18,8 @@
//! Helper that helps with interactive editing, including multi-line parsing and validation,
//! and auto-completion for file name during creating external table.
+use std::borrow::Cow;
+
use datafusion::common::sql_err;
use datafusion::error::DataFusionError;
use datafusion::sql::parser::{DFParser, Statement};
@@ -36,9 +38,12 @@ use rustyline::Context;
use rustyline::Helper;
use rustyline::Result;
+use crate::highlighter::SyntaxHighlighter;
+
pub struct CliHelper {
completer: FilenameCompleter,
dialect: String,
+ highlighter: SyntaxHighlighter,
}
impl CliHelper {
@@ -46,6 +51,7 @@ impl CliHelper {
Self {
completer: FilenameCompleter::new(),
dialect: dialect.into(),
+ highlighter: SyntaxHighlighter::new(dialect),
}
}
@@ -100,7 +106,15 @@ impl Default for CliHelper {
}
}
-impl Highlighter for CliHelper {}
+impl Highlighter for CliHelper {
+ fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> {
+ self.highlighter.highlight(line, pos)
+ }
+
+ fn highlight_char(&self, line: &str, pos: usize) -> bool {
+ self.highlighter.highlight_char(line, pos)
+ }
+}
impl Hinter for CliHelper {
type Hint = String;
diff --git a/datafusion-cli/src/highlighter.rs b/datafusion-cli/src/highlighter.rs
new file mode 100644
index 000000000000..28732d5b976f
--- /dev/null
+++ b/datafusion-cli/src/highlighter.rs
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The syntax highlighter.
+
+use std::{
+ borrow::Cow::{self, Borrowed},
+ fmt::Display,
+};
+
+use datafusion::sql::sqlparser::{
+ dialect::{dialect_from_str, Dialect, GenericDialect},
+ keywords::Keyword,
+ tokenizer::{Token, Tokenizer},
+};
+use rustyline::highlight::Highlighter;
+
+/// The syntax highlighter.
+pub struct SyntaxHighlighter {
+ dialect: Box,
+}
+
+impl SyntaxHighlighter {
+ pub fn new(dialect: &str) -> Self {
+ let dialect = match dialect_from_str(dialect) {
+ Some(dialect) => dialect,
+ None => Box::new(GenericDialect {}),
+ };
+ Self { dialect }
+ }
+}
+
+impl Highlighter for SyntaxHighlighter {
+ fn highlight<'l>(&self, line: &'l str, _: usize) -> Cow<'l, str> {
+ let mut out_line = String::new();
+
+ // `with_unescape(false)` since we want to rebuild the original string.
+ let mut tokenizer =
+ Tokenizer::new(self.dialect.as_ref(), line).with_unescape(false);
+ let tokens = tokenizer.tokenize();
+ match tokens {
+ Ok(tokens) => {
+ for token in tokens.iter() {
+ match token {
+ Token::Word(w) if w.keyword != Keyword::NoKeyword => {
+ out_line.push_str(&Color::red(token));
+ }
+ Token::SingleQuotedString(_) => {
+ out_line.push_str(&Color::green(token));
+ }
+ other => out_line.push_str(&format!("{other}")),
+ }
+ }
+ out_line.into()
+ }
+ Err(_) => Borrowed(line),
+ }
+ }
+
+ fn highlight_char(&self, line: &str, _: usize) -> bool {
+ !line.is_empty()
+ }
+}
+
+/// Convenient utility to return strings with [ANSI color](https://gist.github.com/JBlond/2fea43a3049b38287e5e9cefc87b2124).
+struct Color {}
+
+impl Color {
+ fn green(s: impl Display) -> String {
+ format!("\x1b[92m{s}\x1b[0m")
+ }
+
+ fn red(s: impl Display) -> String {
+ format!("\x1b[91m{s}\x1b[0m")
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::SyntaxHighlighter;
+ use rustyline::highlight::Highlighter;
+
+ #[test]
+ fn highlighter_valid() {
+ let s = "SElect col_a from tab_1;";
+ let highlighter = SyntaxHighlighter::new("generic");
+ let out = highlighter.highlight(s, s.len());
+ assert_eq!(
+ "\u{1b}[91mSElect\u{1b}[0m col_a \u{1b}[91mfrom\u{1b}[0m tab_1;",
+ out
+ );
+ }
+
+ #[test]
+ fn highlighter_valid_with_new_line() {
+ let s = "SElect col_a from tab_1\n WHERE col_b = 'なにか';";
+ let highlighter = SyntaxHighlighter::new("generic");
+ let out = highlighter.highlight(s, s.len());
+ assert_eq!(
+ "\u{1b}[91mSElect\u{1b}[0m col_a \u{1b}[91mfrom\u{1b}[0m tab_1\n \u{1b}[91mWHERE\u{1b}[0m col_b = \u{1b}[92m'なにか'\u{1b}[0m;",
+ out
+ );
+ }
+
+ #[test]
+ fn highlighter_invalid() {
+ let s = "SElect col_a from tab_1 WHERE col_b = ';";
+ let highlighter = SyntaxHighlighter::new("generic");
+ let out = highlighter.highlight(s, s.len());
+ assert_eq!("SElect col_a from tab_1 WHERE col_b = ';", out);
+ }
+}
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 7eb3cb51c1f8..61f9eae7dd53 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -26,3 +26,5 @@ pub mod helper;
pub mod object_storage;
pub mod print_format;
pub mod print_options;
+
+mod highlighter;
diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs
index 0a8c7b4b3e3a..2de52be612bb 100644
--- a/datafusion-cli/src/print_format.rs
+++ b/datafusion-cli/src/print_format.rs
@@ -190,117 +190,212 @@ impl PrintFormat {
#[cfg(test)]
mod tests {
- use std::io::{Cursor, Read, Write};
- use std::sync::Arc;
-
use super::*;
+ use std::sync::Arc;
use arrow::array::{ArrayRef, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
- use datafusion::error::Result;
-
- fn run_test(batches: &[RecordBatch], test_fn: F) -> Result
- where
- F: Fn(&mut Cursor>, &[RecordBatch]) -> Result<()>,
- {
- let mut buffer = Cursor::new(Vec::new());
- test_fn(&mut buffer, batches)?;
- buffer.set_position(0);
- let mut contents = String::new();
- buffer.read_to_string(&mut contents)?;
- Ok(contents)
+
+ #[test]
+ fn print_empty() {
+ for format in [
+ PrintFormat::Csv,
+ PrintFormat::Tsv,
+ PrintFormat::Table,
+ PrintFormat::Json,
+ PrintFormat::NdJson,
+ PrintFormat::Automatic,
+ ] {
+ // no output for empty batches, even with header set
+ PrintBatchesTest::new()
+ .with_format(format)
+ .with_batches(vec![])
+ .with_expected(&[""])
+ .run();
+ }
}
#[test]
- fn test_print_batches_with_sep() -> Result<()> {
- let contents = run_test(&[], |buffer, batches| {
- print_batches_with_sep(buffer, batches, b',', true)
- })?;
- assert_eq!(contents, "");
+ fn print_csv_no_header() {
+ #[rustfmt::skip]
+ let expected = &[
+ "1,4,7",
+ "2,5,8",
+ "3,6,9",
+ ];
- let schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::Int32, false),
- Field::new("b", DataType::Int32, false),
- Field::new("c", DataType::Int32, false),
- ]));
- let batch = RecordBatch::try_new(
- schema,
- vec![
- Arc::new(Int32Array::from(vec![1, 2, 3])),
- Arc::new(Int32Array::from(vec![4, 5, 6])),
- Arc::new(Int32Array::from(vec![7, 8, 9])),
- ],
- )?;
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Csv)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::No)
+ .with_expected(expected)
+ .run();
+ }
- let contents = run_test(&[batch], |buffer, batches| {
- print_batches_with_sep(buffer, batches, b',', true)
- })?;
- assert_eq!(contents, "a,b,c\n1,4,7\n2,5,8\n3,6,9\n");
+ #[test]
+ fn print_csv_with_header() {
+ #[rustfmt::skip]
+ let expected = &[
+ "a,b,c",
+ "1,4,7",
+ "2,5,8",
+ "3,6,9",
+ ];
- Ok(())
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Csv)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::Yes)
+ .with_expected(expected)
+ .run();
}
#[test]
- fn test_print_batches_to_json_empty() -> Result<()> {
- let contents = run_test(&[], |buffer, batches| {
- batches_to_json!(ArrayWriter, buffer, batches)
- })?;
- assert_eq!(contents, "");
+ fn print_tsv_no_header() {
+ #[rustfmt::skip]
+ let expected = &[
+ "1\t4\t7",
+ "2\t5\t8",
+ "3\t6\t9",
+ ];
- let contents = run_test(&[], |buffer, batches| {
- batches_to_json!(LineDelimitedWriter, buffer, batches)
- })?;
- assert_eq!(contents, "");
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Tsv)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::No)
+ .with_expected(expected)
+ .run();
+ }
- let schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::Int32, false),
- Field::new("b", DataType::Int32, false),
- Field::new("c", DataType::Int32, false),
- ]));
- let batch = RecordBatch::try_new(
- schema,
- vec![
- Arc::new(Int32Array::from(vec![1, 2, 3])),
- Arc::new(Int32Array::from(vec![4, 5, 6])),
- Arc::new(Int32Array::from(vec![7, 8, 9])),
- ],
- )?;
- let batches = vec![batch];
+ #[test]
+ fn print_tsv_with_header() {
+ #[rustfmt::skip]
+ let expected = &[
+ "a\tb\tc",
+ "1\t4\t7",
+ "2\t5\t8",
+ "3\t6\t9",
+ ];
+
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Tsv)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::Yes)
+ .with_expected(expected)
+ .run();
+ }
- let contents = run_test(&batches, |buffer, batches| {
- batches_to_json!(ArrayWriter, buffer, batches)
- })?;
- assert_eq!(contents, "[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]\n");
+ #[test]
+ fn print_table() {
+ let expected = &[
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 1 | 4 | 7 |",
+ "| 2 | 5 | 8 |",
+ "| 3 | 6 | 9 |",
+ "+---+---+---+",
+ ];
- let contents = run_test(&batches, |buffer, batches| {
- batches_to_json!(LineDelimitedWriter, buffer, batches)
- })?;
- assert_eq!(contents, "{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n");
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Table)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::Ignored)
+ .with_expected(expected)
+ .run();
+ }
+ #[test]
+ fn print_json() {
+ let expected =
+ &[r#"[{"a":1,"b":4,"c":7},{"a":2,"b":5,"c":8},{"a":3,"b":6,"c":9}]"#];
- Ok(())
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Json)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::Ignored)
+ .with_expected(expected)
+ .run();
}
#[test]
- fn test_format_batches_with_maxrows() -> Result<()> {
- let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
- let batch = RecordBatch::try_new(
- schema,
- vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
- )?;
+ fn print_ndjson() {
+ let expected = &[
+ r#"{"a":1,"b":4,"c":7}"#,
+ r#"{"a":2,"b":5,"c":8}"#,
+ r#"{"a":3,"b":6,"c":9}"#,
+ ];
+
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::NdJson)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::Ignored)
+ .with_expected(expected)
+ .run();
+ }
+ #[test]
+ fn print_automatic_no_header() {
#[rustfmt::skip]
- let all_rows_expected = [
+ let expected = &[
+ "1,4,7",
+ "2,5,8",
+ "3,6,9",
+ ];
+
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Automatic)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::No)
+ .with_expected(expected)
+ .run();
+ }
+ #[test]
+ fn print_automatic_with_header() {
+ #[rustfmt::skip]
+ let expected = &[
+ "a,b,c",
+ "1,4,7",
+ "2,5,8",
+ "3,6,9",
+ ];
+
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Automatic)
+ .with_batches(split_batch(three_column_batch()))
+ .with_header(WithHeader::Yes)
+ .with_expected(expected)
+ .run();
+ }
+
+ #[test]
+ fn print_maxrows_unlimited() {
+ #[rustfmt::skip]
+ let expected = &[
"+---+",
"| a |",
"+---+",
"| 1 |",
"| 2 |",
"| 3 |",
- "+---+\n",
- ].join("\n");
+ "+---+",
+ ];
+
+ // should print out entire output with no truncation if unlimited or
+ // limit greater than number of batches or equal to the number of batches
+ for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Table)
+ .with_batches(vec![one_column_batch()])
+ .with_maxrows(max_rows)
+ .with_expected(expected)
+ .run();
+ }
+ }
+ #[test]
+ fn print_maxrows_limited_one_batch() {
#[rustfmt::skip]
- let one_row_expected = [
+ let expected = &[
"+---+",
"| a |",
"+---+",
@@ -308,11 +403,21 @@ mod tests {
"| . |",
"| . |",
"| . |",
- "+---+\n",
- ].join("\n");
+ "+---+",
+ ];
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Table)
+ .with_batches(vec![one_column_batch()])
+ .with_maxrows(MaxRows::Limited(1))
+ .with_expected(expected)
+ .run();
+ }
+
+ #[test]
+ fn print_maxrows_limited_multi_batched() {
#[rustfmt::skip]
- let multi_batches_expected = [
+ let expected = &[
"+---+",
"| a |",
"+---+",
@@ -324,42 +429,23 @@ mod tests {
"| . |",
"| . |",
"| . |",
- "+---+\n",
- ].join("\n");
-
- let no_limit = run_test(&[batch.clone()], |buffer, batches| {
- format_batches_with_maxrows(buffer, batches, MaxRows::Unlimited)
- })?;
- assert_eq!(no_limit, all_rows_expected);
-
- let maxrows_less_than_actual = run_test(&[batch.clone()], |buffer, batches| {
- format_batches_with_maxrows(buffer, batches, MaxRows::Limited(1))
- })?;
- assert_eq!(maxrows_less_than_actual, one_row_expected);
-
- let maxrows_more_than_actual = run_test(&[batch.clone()], |buffer, batches| {
- format_batches_with_maxrows(buffer, batches, MaxRows::Limited(5))
- })?;
- assert_eq!(maxrows_more_than_actual, all_rows_expected);
-
- let maxrows_equals_actual = run_test(&[batch.clone()], |buffer, batches| {
- format_batches_with_maxrows(buffer, batches, MaxRows::Limited(3))
- })?;
- assert_eq!(maxrows_equals_actual, all_rows_expected);
-
- let multi_batches = run_test(
- &[batch.clone(), batch.clone(), batch.clone()],
- |buffer, batches| {
- format_batches_with_maxrows(buffer, batches, MaxRows::Limited(5))
- },
- )?;
- assert_eq!(multi_batches, multi_batches_expected);
-
- Ok(())
+ "+---+",
+ ];
+
+ PrintBatchesTest::new()
+ .with_format(PrintFormat::Table)
+ .with_batches(vec![
+ one_column_batch(),
+ one_column_batch(),
+ one_column_batch(),
+ ])
+ .with_maxrows(MaxRows::Limited(5))
+ .with_expected(expected)
+ .run();
}
#[test]
- fn test_print_batches_empty_batches() -> Result<()> {
+ fn test_print_batches_empty_batches() {
let batch = one_column_batch();
let empty_batch = RecordBatch::new_empty(batch.schema());
@@ -371,7 +457,7 @@ mod tests {
"| 1 |",
"| 2 |",
"| 3 |",
- "+---+\n",
+ "+---+",
];
PrintBatchesTest::new()
@@ -379,11 +465,10 @@ mod tests {
.with_batches(vec![empty_batch.clone(), batch, empty_batch])
.with_expected(expected)
.run();
- Ok(())
}
#[test]
- fn test_print_batches_empty_batches_no_header() -> Result<()> {
+ fn test_print_batches_empty_batches_no_header() {
let empty_batch = RecordBatch::new_empty(one_column_batch().schema());
// empty batches should not print a header
@@ -392,27 +477,36 @@ mod tests {
PrintBatchesTest::new()
.with_format(PrintFormat::Table)
.with_batches(vec![empty_batch])
- .with_header(true)
+ .with_header(WithHeader::Yes)
.with_expected(expected)
.run();
- Ok(())
}
+ #[derive(Debug)]
struct PrintBatchesTest {
format: PrintFormat,
batches: Vec,
maxrows: MaxRows,
- with_header: bool,
+ with_header: WithHeader,
expected: Vec<&'static str>,
}
+ /// How to test with_header
+ #[derive(Debug, Clone)]
+ enum WithHeader {
+ Yes,
+ No,
+ /// output should be the same with or without header
+ Ignored,
+ }
+
impl PrintBatchesTest {
fn new() -> Self {
Self {
format: PrintFormat::Table,
batches: vec![],
maxrows: MaxRows::Unlimited,
- with_header: false,
+ with_header: WithHeader::Ignored,
expected: vec![],
}
}
@@ -429,8 +523,14 @@ mod tests {
self
}
- /// set whether to include a header
- fn with_header(mut self, with_header: bool) -> Self {
+ /// set maxrows
+ fn with_maxrows(mut self, maxrows: MaxRows) -> Self {
+ self.maxrows = maxrows;
+ self
+ }
+
+ /// set with_header
+ fn with_header(mut self, with_header: WithHeader) -> Self {
self.with_header = with_header;
self
}
@@ -443,17 +543,58 @@ mod tests {
/// run the test
fn run(self) {
- let mut buffer: Vec = vec![];
- self.format
- .print_batches(&mut buffer, &self.batches, self.maxrows, self.with_header)
- .unwrap();
- let actual = String::from_utf8(buffer).unwrap();
- let expected = self.expected.join("\n");
+ let actual = self.output();
+ let actual: Vec<_> = actual.trim_end().split('\n').collect();
+ let expected = self.expected;
assert_eq!(
actual, expected,
- "actual:\n\n{actual}expected:\n\n{expected}"
+ "\n\nactual:\n{actual:#?}\n\nexpected:\n{expected:#?}"
);
}
+
+ /// formats batches using parameters and returns the resulting output
+ fn output(&self) -> String {
+ match self.with_header {
+ WithHeader::Yes => self.output_with_header(true),
+ WithHeader::No => self.output_with_header(false),
+ WithHeader::Ignored => {
+ let output = self.output_with_header(true);
+ // ensure the output is the same without header
+ let output_without_header = self.output_with_header(false);
+ assert_eq!(
+ output, output_without_header,
+ "Expected output to be the same with or without header"
+ );
+ output
+ }
+ }
+ }
+
+ fn output_with_header(&self, with_header: bool) -> String {
+ let mut buffer: Vec = vec![];
+ self.format
+ .print_batches(&mut buffer, &self.batches, self.maxrows, with_header)
+ .unwrap();
+ String::from_utf8(buffer).unwrap()
+ }
+ }
+
+ /// Return a batch with three columns and three rows
+ fn three_column_batch() -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]));
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(Int32Array::from(vec![4, 5, 6])),
+ Arc::new(Int32Array::from(vec![7, 8, 9])),
+ ],
+ )
+ .unwrap()
}
/// return a batch with one column and three rows
@@ -464,4 +605,14 @@ mod tests {
)])
.unwrap()
}
+
+ /// Slice the record batch into 2 batches
+ fn split_batch(batch: RecordBatch) -> Vec {
+ assert!(batch.num_rows() > 1);
+ let split = batch.num_rows() / 2;
+ vec![
+ batch.slice(0, split),
+ batch.slice(split, batch.num_rows() - split),
+ ]
+ }
}
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 59580bcb6a05..45c9709a342e 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -40,6 +40,7 @@ datafusion = { path = "../datafusion/core", features = ["avro"] }
datafusion-common = { path = "../datafusion/common" }
datafusion-expr = { path = "../datafusion/expr" }
datafusion-optimizer = { path = "../datafusion/optimizer" }
+datafusion-physical-expr = { workspace = true }
datafusion-sql = { path = "../datafusion/sql" }
env_logger = { workspace = true }
futures = { workspace = true }
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index eecb63d3be65..298ee9364efe 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -64,6 +64,7 @@ cargo run --example csv_sql
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
+- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using the to_timestamp functions
- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
## Distributed
diff --git a/datafusion-examples/examples/advanced_udaf.rs b/datafusion-examples/examples/advanced_udaf.rs
index 8d5314bfbea5..10164a850bfb 100644
--- a/datafusion-examples/examples/advanced_udaf.rs
+++ b/datafusion-examples/examples/advanced_udaf.rs
@@ -16,16 +16,22 @@
// under the License.
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
+use datafusion_physical_expr::NullState;
use std::{any::Any, sync::Arc};
use arrow::{
- array::{ArrayRef, Float32Array},
+ array::{
+ ArrayRef, AsArray, Float32Array, PrimitiveArray, PrimitiveBuilder, UInt32Array,
+ },
+ datatypes::{ArrowNativeTypeOp, ArrowPrimitiveType, Float64Type, UInt32Type},
record_batch::RecordBatch,
};
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{cast::as_float64_array, ScalarValue};
-use datafusion_expr::{Accumulator, AggregateUDF, AggregateUDFImpl, Signature};
+use datafusion_expr::{
+ Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature,
+};
/// This example shows how to use the full AggregateUDFImpl API to implement a user
/// defined aggregate function. As in the `simple_udaf.rs` example, this struct implements
@@ -33,12 +39,12 @@ use datafusion_expr::{Accumulator, AggregateUDF, AggregateUDFImpl, Signature};
///
/// To do so, we must implement the `AggregateUDFImpl` trait.
#[derive(Debug, Clone)]
-struct GeoMeanUdf {
+struct GeoMeanUdaf {
signature: Signature,
}
-impl GeoMeanUdf {
- /// Create a new instance of the GeoMeanUdf struct
+impl GeoMeanUdaf {
+ /// Create a new instance of the GeoMeanUdaf struct
fn new() -> Self {
Self {
signature: Signature::exact(
@@ -52,7 +58,7 @@ impl GeoMeanUdf {
}
}
-impl AggregateUDFImpl for GeoMeanUdf {
+impl AggregateUDFImpl for GeoMeanUdaf {
/// We implement as_any so that we can downcast the AggregateUDFImpl trait object
fn as_any(&self) -> &dyn Any {
self
@@ -74,6 +80,11 @@ impl AggregateUDFImpl for GeoMeanUdf {
}
/// This is the accumulator factory; DataFusion uses it to create new accumulators.
+ ///
+ /// This is the accumulator factory for row wise accumulation; Even when `GroupsAccumulator`
+ /// is supported, DataFusion will use this row oriented
+ /// accumulator when the aggregate function is used as a window function
+ /// or when there are only aggregates (no GROUP BY columns) in the plan.
fn accumulator(&self, _arg: &DataType) -> Result> {
Ok(Box::new(GeometricMean::new()))
}
@@ -82,6 +93,16 @@ impl AggregateUDFImpl for GeoMeanUdf {
fn state_type(&self, _return_type: &DataType) -> Result> {
Ok(vec![DataType::Float64, DataType::UInt32])
}
+
+ /// Tell DataFusion that this aggregate supports the more performant `GroupsAccumulator`
+ /// which is used for cases when there are grouping columns in the query
+ fn groups_accumulator_supported(&self) -> bool {
+ true
+ }
+
+ fn create_groups_accumulator(&self) -> Result> {
+ Ok(Box::new(GeometricMeanGroupsAccumulator::new()))
+ }
}
/// A UDAF has state across multiple rows, and thus we require a `struct` with that state.
@@ -104,7 +125,7 @@ impl Accumulator for GeometricMean {
// This function serializes our state to `ScalarValue`, which DataFusion uses
// to pass this state between execution stages.
// Note that this can be arbitrary data.
- fn state(&self) -> Result> {
+ fn state(&mut self) -> Result> {
Ok(vec![
ScalarValue::from(self.prod),
ScalarValue::from(self.n),
@@ -113,7 +134,7 @@ impl Accumulator for GeometricMean {
// DataFusion expects this function to return the final value of this aggregator.
// in this case, this is the formula of the geometric mean
- fn evaluate(&self) -> Result {
+ fn evaluate(&mut self) -> Result {
let value = self.prod.powf(1.0 / self.n as f64);
Ok(ScalarValue::from(value))
}
@@ -173,16 +194,25 @@ fn create_context() -> Result {
use datafusion::arrow::datatypes::{Field, Schema};
use datafusion::datasource::MemTable;
// define a schema.
- let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Float32, false),
+ Field::new("b", DataType::Float32, false),
+ ]));
// define data in two partitions
let batch1 = RecordBatch::try_new(
schema.clone(),
- vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
+ vec![
+ Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0])),
+ Arc::new(Float32Array::from(vec![2.0, 2.0, 2.0])),
+ ],
)?;
let batch2 = RecordBatch::try_new(
schema.clone(),
- vec![Arc::new(Float32Array::from(vec![64.0]))],
+ vec![
+ Arc::new(Float32Array::from(vec![64.0])),
+ Arc::new(Float32Array::from(vec![2.0])),
+ ],
)?;
// declare a new context. In spark API, this corresponds to a new spark SQLsession
@@ -194,15 +224,183 @@ fn create_context() -> Result {
Ok(ctx)
}
+// Define a `GroupsAccumulator` for GeometricMean
+/// which handles accumulator state for multiple groups at once.
+/// This API is significantly more complicated than `Accumulator`, which manages
+/// the state for a single group, but for queries with a large number of groups
+/// can be significantly faster. See the `GroupsAccumulator` documentation for
+/// more information.
+struct GeometricMeanGroupsAccumulator {
+ /// The type of the internal sum
+ prod_data_type: DataType,
+
+ /// The type of the returned sum
+ return_data_type: DataType,
+
+ /// Count per group (use u32 to make UInt32Array)
+ counts: Vec,
+
+ /// product per group, stored as the native type (not `ScalarValue`)
+ prods: Vec,
+
+ /// Track nulls in the input / filters
+ null_state: NullState,
+}
+
+impl GeometricMeanGroupsAccumulator {
+ fn new() -> Self {
+ Self {
+ prod_data_type: DataType::Float64,
+ return_data_type: DataType::Float64,
+ counts: vec![],
+ prods: vec![],
+ null_state: NullState::new(),
+ }
+ }
+}
+
+impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
+ /// Updates the accumulator state given input. DataFusion provides `group_indices`,
+ /// the groups that each row in `values` belongs to as well as an optional filter of which rows passed.
+ fn update_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&arrow::array::BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "single argument to update_batch");
+ let values = values[0].as_primitive::();
+
+ // increment counts, update sums
+ self.counts.resize(total_num_groups, 0);
+ self.prods.resize(total_num_groups, 1.0);
+ // Use the `NullState` structure to generate specialized code for null / non null input elements
+ self.null_state.accumulate(
+ group_indices,
+ values,
+ opt_filter,
+ total_num_groups,
+ |group_index, new_value| {
+ let prod = &mut self.prods[group_index];
+ *prod = prod.mul_wrapping(new_value);
+
+ self.counts[group_index] += 1;
+ },
+ );
+
+ Ok(())
+ }
+
+ /// Merge the results from previous invocations of `evaluate` into this accumulator's state
+ fn merge_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&arrow::array::BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 2, "two arguments to merge_batch");
+ // first batch is counts, second is partial sums
+ let partial_prods = values[0].as_primitive::();
+ let partial_counts = values[1].as_primitive::();
+ // update counts with partial counts
+ self.counts.resize(total_num_groups, 0);
+ self.null_state.accumulate(
+ group_indices,
+ partial_counts,
+ opt_filter,
+ total_num_groups,
+ |group_index, partial_count| {
+ self.counts[group_index] += partial_count;
+ },
+ );
+
+ // update prods
+ self.prods.resize(total_num_groups, 1.0);
+ self.null_state.accumulate(
+ group_indices,
+ partial_prods,
+ opt_filter,
+ total_num_groups,
+ |group_index, new_value: ::Native| {
+ let prod = &mut self.prods[group_index];
+ *prod = prod.mul_wrapping(new_value);
+ },
+ );
+
+ Ok(())
+ }
+
+ /// Generate output, as specififed by `emit_to` and update the intermediate state
+ fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result {
+ let counts = emit_to.take_needed(&mut self.counts);
+ let prods = emit_to.take_needed(&mut self.prods);
+ let nulls = self.null_state.build(emit_to);
+
+ assert_eq!(nulls.len(), prods.len());
+ assert_eq!(counts.len(), prods.len());
+
+ // don't evaluate geometric mean with null inputs to avoid errors on null values
+
+ let array: PrimitiveArray = if nulls.null_count() > 0 {
+ let mut builder = PrimitiveBuilder::::with_capacity(nulls.len());
+ let iter = prods.into_iter().zip(counts).zip(nulls.iter());
+
+ for ((prod, count), is_valid) in iter {
+ if is_valid {
+ builder.append_value(prod.powf(1.0 / count as f64))
+ } else {
+ builder.append_null();
+ }
+ }
+ builder.finish()
+ } else {
+ let geo_mean: Vec<::Native> = prods
+ .into_iter()
+ .zip(counts)
+ .map(|(prod, count)| prod.powf(1.0 / count as f64))
+ .collect::>();
+ PrimitiveArray::new(geo_mean.into(), Some(nulls)) // no copy
+ .with_data_type(self.return_data_type.clone())
+ };
+
+ Ok(Arc::new(array))
+ }
+
+ // return arrays for counts and prods
+ fn state(&mut self, emit_to: datafusion_expr::EmitTo) -> Result> {
+ let nulls = self.null_state.build(emit_to);
+ let nulls = Some(nulls);
+
+ let counts = emit_to.take_needed(&mut self.counts);
+ let counts = UInt32Array::new(counts.into(), nulls.clone()); // zero copy
+
+ let prods = emit_to.take_needed(&mut self.prods);
+ let prods = PrimitiveArray::::new(prods.into(), nulls) // zero copy
+ .with_data_type(self.prod_data_type.clone());
+
+ Ok(vec![
+ Arc::new(prods) as ArrayRef,
+ Arc::new(counts) as ArrayRef,
+ ])
+ }
+
+ fn size(&self) -> usize {
+ self.counts.capacity() * std::mem::size_of::()
+ + self.prods.capacity() * std::mem::size_of::()
+ }
+}
+
#[tokio::main]
async fn main() -> Result<()> {
let ctx = create_context()?;
// create the AggregateUDF
- let geometric_mean = AggregateUDF::from(GeoMeanUdf::new());
+ let geometric_mean = AggregateUDF::from(GeoMeanUdaf::new());
ctx.register_udaf(geometric_mean.clone());
- let sql_df = ctx.sql("SELECT geo_mean(a) FROM t").await?;
+ let sql_df = ctx.sql("SELECT geo_mean(a) FROM t group by b").await?;
sql_df.show().await?;
// get a DataFrame from the context
diff --git a/datafusion-examples/examples/simple_udaf.rs b/datafusion-examples/examples/simple_udaf.rs
index 2c797f221b2c..0996a67245a8 100644
--- a/datafusion-examples/examples/simple_udaf.rs
+++ b/datafusion-examples/examples/simple_udaf.rs
@@ -72,7 +72,7 @@ impl Accumulator for GeometricMean {
// This function serializes our state to `ScalarValue`, which DataFusion uses
// to pass this state between execution stages.
// Note that this can be arbitrary data.
- fn state(&self) -> Result> {
+ fn state(&mut self) -> Result> {
Ok(vec![
ScalarValue::from(self.prod),
ScalarValue::from(self.n),
@@ -81,7 +81,7 @@ impl Accumulator for GeometricMean {
// DataFusion expects this function to return the final value of this aggregator.
// in this case, this is the formula of the geometric mean
- fn evaluate(&self) -> Result {
+ fn evaluate(&mut self) -> Result {
let value = self.prod.powf(1.0 / self.n as f64);
Ok(ScalarValue::from(value))
}
diff --git a/datafusion-examples/examples/simple_udf.rs b/datafusion-examples/examples/simple_udf.rs
index 39e1e13ce39a..dda6ba62e0af 100644
--- a/datafusion-examples/examples/simple_udf.rs
+++ b/datafusion-examples/examples/simple_udf.rs
@@ -24,9 +24,11 @@ use datafusion::{
logical_expr::Volatility,
};
+use datafusion::error::Result;
use datafusion::prelude::*;
-use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
use datafusion_common::cast::as_float64_array;
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_expr::functions::columnar_values_to_array;
use std::sync::Arc;
/// create local execution context with an in-memory table:
@@ -61,7 +63,7 @@ async fn main() -> Result<()> {
let ctx = create_context()?;
// First, declare the actual implementation of the calculation
- let pow = |args: &[ArrayRef]| {
+ let pow = Arc::new(|args: &[ColumnarValue]| {
// in DataFusion, all `args` and output are dynamically-typed arrays, which means that we need to:
// 1. cast the values to the type we want
// 2. perform the computation for every element in the array (using a loop or SIMD) and construct the result
@@ -69,6 +71,8 @@ async fn main() -> Result<()> {
// this is guaranteed by DataFusion based on the function's signature.
assert_eq!(args.len(), 2);
+ let args = columnar_values_to_array(args)?;
+
// 1. cast both arguments to f64. These casts MUST be aligned with the signature or this function panics!
let base = as_float64_array(&args[0]).expect("cast failed");
let exponent = as_float64_array(&args[1]).expect("cast failed");
@@ -92,11 +96,8 @@ async fn main() -> Result<()> {
// `Ok` because no error occurred during the calculation (we should add one if exponent was [0, 1[ and the base < 0 because that panics!)
// `Arc` because arrays are immutable, thread-safe, trait objects.
- Ok(Arc::new(array) as ArrayRef)
- };
- // the function above expects an `ArrayRef`, but DataFusion may pass a scalar to a UDF.
- // thus, we use `make_scalar_function` to decorare the closure so that it can handle both Arrays and Scalar values.
- let pow = make_scalar_function(pow);
+ Ok(ColumnarValue::from(Arc::new(array) as ArrayRef))
+ });
// Next:
// * give it a name so that it shows nicely when the plan is printed
diff --git a/datafusion-examples/examples/to_timestamp.rs b/datafusion-examples/examples/to_timestamp.rs
new file mode 100644
index 000000000000..a07dbaefb75b
--- /dev/null
+++ b/datafusion-examples/examples/to_timestamp.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::StringArray;
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use datafusion_common::assert_contains;
+
+/// This example demonstrates how to use the to_timestamp series
+/// of functions in the DataFrame API as well as via sql.
+#[tokio::main]
+async fn main() -> Result<()> {
+ // define a schema.
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+
+ // define data.
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(StringArray::from(vec![
+ "2020-09-08T13:42:29Z",
+ "2020-09-08T13:42:29.190855-05:00",
+ "2020-08-09 12:13:29",
+ "2020-01-02",
+ ])),
+ Arc::new(StringArray::from(vec![
+ "2020-09-08T13:42:29Z",
+ "2020-09-08T13:42:29.190855-05:00",
+ "08-09-2020 13/42/29",
+ "09-27-2020 13:42:29-05:30",
+ ])),
+ ],
+ )?;
+
+ // declare a new context. In spark API, this corresponds to a new spark SQLsession
+ let ctx = SessionContext::new();
+
+ // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
+ ctx.register_batch("t", batch)?;
+ let df = ctx.table("t").await?;
+
+ // use to_timestamp function to convert col 'a' to timestamp type using the default parsing
+ let df = df.with_column("a", to_timestamp(vec![col("a")]))?;
+ // use to_timestamp_seconds function to convert col 'b' to timestamp(Seconds) type using a list
+ // of chrono formats (https://docs.rs/chrono/latest/chrono/format/strftime/index.html) to try
+ let df = df.with_column(
+ "b",
+ to_timestamp_seconds(vec![
+ col("b"),
+ lit("%+"),
+ lit("%d-%m-%Y %H/%M/%S"),
+ lit("%m-%d-%Y %H:%M:%S%#z"),
+ ]),
+ )?;
+
+ let df = df.select_columns(&["a", "b"])?;
+
+ // print the results
+ df.show().await?;
+
+ // use sql to convert col 'a' to timestamp using the default parsing
+ let df = ctx.sql("select to_timestamp(a) from t").await?;
+
+ // print the results
+ df.show().await?;
+
+ // use sql to convert col 'b' to timestamp using a list of chrono formats to try
+ let df = ctx.sql("select to_timestamp(b, '%+', '%d-%m-%Y %H/%M/%S', '%m-%d-%Y %H:%M:%S%#z') from t").await?;
+
+ // print the results
+ df.show().await?;
+
+ // use sql to convert a static string to a timestamp using a list of chrono formats to try
+ // note that one of the formats is invalid ('%q') but since DataFusion will try all the
+ // formats until it encounters one that parses the timestamp expression successfully
+ // no error will be returned
+ let df = ctx.sql("select to_timestamp_micros('01-14-2023 01:01:30+05:30', '%q', '%d-%m-%Y %H/%M/%S', '%+', '%m-%d-%Y %H:%M:%S%#z')").await?;
+
+ // print the results
+ df.show().await?;
+
+ // casting a string to TIMESTAMP will also work for RFC3339 timestamps
+ let df = ctx
+ .sql("select to_timestamp_millis(TIMESTAMP '2022-08-03T14:38:50Z')")
+ .await?;
+
+ // print the results
+ df.show().await?;
+
+ // unix timestamps (in seconds) are also supported
+ let df = ctx.sql("select to_timestamp(1926632005)").await?;
+
+ // print the results
+ df.show().await?;
+
+ // use sql to convert a static string to a timestamp using a non-matching chrono format to try
+ let result = ctx
+ .sql("select to_timestamp_nanos('01-14-2023 01/01/30', '%d-%m-%Y %H:%M:%S')")
+ .await?
+ .collect()
+ .await;
+
+ let expected = "Error parsing timestamp from '01-14-2023 01/01/30' using format '%d-%m-%Y %H:%M:%S': input contains invalid characters";
+ assert_contains!(result.unwrap_err().to_string(), expected);
+
+ // note that using arrays for the chrono formats is not supported
+ let result = ctx
+ .sql("SELECT to_timestamp('2022-08-03T14:38:50+05:30', make_array('%s', '%q', '%d-%m-%Y %H:%M:%S%#z', '%+'))")
+ .await?
+ .collect()
+ .await;
+
+ let expected = "to_timestamp function unsupported data type at index 1: List";
+ assert_contains!(result.unwrap_err().to_string(), expected);
+
+ Ok(())
+}
diff --git a/datafusion/CHANGELOG.md b/datafusion/CHANGELOG.md
index d64bbeda877d..ae9da0e865e9 100644
--- a/datafusion/CHANGELOG.md
+++ b/datafusion/CHANGELOG.md
@@ -19,6 +19,7 @@
# Changelog
+- [35.0.0](../dev/changelog/35.0.0.md)
- [34.0.0](../dev/changelog/34.0.0.md)
- [33.0.0](../dev/changelog/33.0.0.md)
- [32.0.0](../dev/changelog/32.0.0.md)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index e00c17930850..0d773ddb2b4c 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -408,7 +408,7 @@ config_namespace! {
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
- pub allow_single_file_parallelism: bool, default = false
+ pub allow_single_file_parallelism: bool, default = true
/// By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
@@ -561,6 +561,10 @@ config_namespace! {
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
+ /// The maximum estimated size in rows for one input side of a HashJoin
+ /// will be collected into a single partition
+ pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
+
/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
diff --git a/datafusion/common/src/file_options/arrow_writer.rs b/datafusion/common/src/file_options/arrow_writer.rs
index a30e6d800e20..cb921535aba5 100644
--- a/datafusion/common/src/file_options/arrow_writer.rs
+++ b/datafusion/common/src/file_options/arrow_writer.rs
@@ -27,6 +27,18 @@ use super::StatementOptions;
#[derive(Clone, Debug)]
pub struct ArrowWriterOptions {}
+impl ArrowWriterOptions {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl Default for ArrowWriterOptions {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl TryFrom<(&ConfigOptions, &StatementOptions)> for ArrowWriterOptions {
type Error = DataFusionError;
diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs
index 8dcc00ca1c29..d5a1b3ee363b 100644
--- a/datafusion/common/src/hash_utils.rs
+++ b/datafusion/common/src/hash_utils.rs
@@ -27,8 +27,9 @@ use arrow::{downcast_dictionary_array, downcast_primitive_array};
use arrow_buffer::i256;
use crate::cast::{
- as_boolean_array, as_generic_binary_array, as_large_list_array, as_list_array,
- as_primitive_array, as_string_array, as_struct_array,
+ as_boolean_array, as_fixed_size_list_array, as_generic_binary_array,
+ as_large_list_array, as_list_array, as_primitive_array, as_string_array,
+ as_struct_array,
};
use crate::error::{DataFusionError, Result, _internal_err};
@@ -267,6 +268,38 @@ where
Ok(())
}
+fn hash_fixed_list_array(
+ array: &FixedSizeListArray,
+ random_state: &RandomState,
+ hashes_buffer: &mut [u64],
+) -> Result<()> {
+ let values = array.values().clone();
+ let value_len = array.value_length();
+ let offset_size = value_len as usize / array.len();
+ let nulls = array.nulls();
+ let mut values_hashes = vec![0u64; values.len()];
+ create_hashes(&[values], random_state, &mut values_hashes)?;
+ if let Some(nulls) = nulls {
+ for i in 0..array.len() {
+ if nulls.is_valid(i) {
+ let hash = &mut hashes_buffer[i];
+ for values_hash in &values_hashes[i * offset_size..(i + 1) * offset_size]
+ {
+ *hash = combine_hashes(*hash, *values_hash);
+ }
+ }
+ }
+ } else {
+ for i in 0..array.len() {
+ let hash = &mut hashes_buffer[i];
+ for values_hash in &values_hashes[i * offset_size..(i + 1) * offset_size] {
+ *hash = combine_hashes(*hash, *values_hash);
+ }
+ }
+ }
+ Ok(())
+}
+
/// Test version of `create_hashes` that produces the same value for
/// all hashes (to test collisions)
///
@@ -366,6 +399,10 @@ pub fn create_hashes<'a>(
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
+ DataType::FixedSizeList(_,_) => {
+ let array = as_fixed_size_list_array(array)?;
+ hash_fixed_list_array(array, random_state, hashes_buffer)?;
+ }
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
@@ -546,6 +583,30 @@ mod tests {
assert_eq!(hashes[2], hashes[3]);
}
+ #[test]
+ // Tests actual values of hashes, which are different if forcing collisions
+ #[cfg(not(feature = "force_hash_collisions"))]
+ fn create_hashes_for_fixed_size_list_arrays() {
+ let data = vec![
+ Some(vec![Some(0), Some(1), Some(2)]),
+ None,
+ Some(vec![Some(3), None, Some(5)]),
+ Some(vec![Some(3), None, Some(5)]),
+ None,
+ Some(vec![Some(0), Some(1), Some(2)]),
+ ];
+ let list_array =
+ Arc::new(FixedSizeListArray::from_iter_primitive::(
+ data, 3,
+ )) as ArrayRef;
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+ let mut hashes = vec![0; list_array.len()];
+ create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
+ assert_eq!(hashes[0], hashes[5]);
+ assert_eq!(hashes[1], hashes[4]);
+ assert_eq!(hashes[2], hashes[3]);
+ }
+
#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
diff --git a/datafusion/common/src/param_value.rs b/datafusion/common/src/param_value.rs
index 3fe2ba99ab83..c614098713d6 100644
--- a/datafusion/common/src/param_value.rs
+++ b/datafusion/common/src/param_value.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::error::{_internal_err, _plan_err};
+use crate::error::_plan_err;
use crate::{DataFusionError, Result, ScalarValue};
use arrow_schema::DataType;
use std::collections::HashMap;
@@ -65,11 +65,7 @@ impl ParamValues {
}
}
- pub fn get_placeholders_with_values(
- &self,
- id: &str,
- data_type: Option<&DataType>,
- ) -> Result {
+ pub fn get_placeholders_with_values(&self, id: &str) -> Result {
match self {
ParamValues::List(list) => {
if id.is_empty() {
@@ -90,14 +86,6 @@ impl ParamValues {
"No value found for placeholder with id {id}"
))
})?;
- // check if the data type of the value matches the data type of the placeholder
- if Some(&value.data_type()) != data_type {
- return _internal_err!(
- "Placeholder value type mismatch: expected {:?}, got {:?}",
- data_type,
- value.data_type()
- );
- }
Ok(value.clone())
}
ParamValues::Map(map) => {
@@ -109,14 +97,6 @@ impl ParamValues {
"No value found for placeholder with name {id}"
))
})?;
- // check if the data type of the value matches the data type of the placeholder
- if Some(&value.data_type()) != data_type {
- return _internal_err!(
- "Placeholder value type mismatch: expected {:?}, got {:?}",
- data_type,
- value.data_type()
- );
- }
Ok(value.clone())
}
}
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 20d03c70960a..2f9e374bd7f4 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -34,8 +34,9 @@ use crate::cast::{
};
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use crate::hash_utils::create_hashes;
-use crate::utils::{array_into_large_list_array, array_into_list_array};
-
+use crate::utils::{
+ array_into_fixed_size_list_array, array_into_large_list_array, array_into_list_array,
+};
use arrow::compute::kernels::numeric::*;
use arrow::util::display::{ArrayFormatter, FormatOptions};
use arrow::{
@@ -2223,9 +2224,11 @@ impl ScalarValue {
let list_array = as_fixed_size_list_array(array)?;
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
- let arr = Arc::new(array_into_list_array(nested_array));
+ let list_size = nested_array.len();
+ let arr =
+ Arc::new(array_into_fixed_size_list_array(nested_array, list_size));
- ScalarValue::List(arr)
+ ScalarValue::FixedSizeList(arr)
}
DataType::Date32 => typed_cast!(array, index, Date32Array, Date32)?,
DataType::Date64 => typed_cast!(array, index, Date64Array, Date64)?,
@@ -2361,6 +2364,16 @@ impl ScalarValue {
ScalarValue::try_from_array(&cast_arr, 0)
}
+ /// Try to cast this value to a ScalarValue of type `data_type`
+ pub fn cast_to(&self, data_type: &DataType) -> Result {
+ let cast_options = CastOptions {
+ safe: false,
+ format_options: Default::default(),
+ };
+ let cast_arr = cast_with_options(&self.to_array()?, data_type, &cast_options)?;
+ ScalarValue::try_from_array(&cast_arr, 0)
+ }
+
fn eq_array_decimal(
array: &ArrayRef,
index: usize,
@@ -2971,6 +2984,19 @@ impl TryFrom<&DataType> for ScalarValue {
.to_owned()
.into(),
),
+ // `ScalaValue::FixedSizeList` contains single element `FixedSizeList`.
+ DataType::FixedSizeList(field, _) => ScalarValue::FixedSizeList(
+ new_null_array(
+ &DataType::FixedSizeList(
+ Arc::new(Field::new("item", field.data_type().clone(), true)),
+ 1,
+ ),
+ 1,
+ )
+ .as_fixed_size_list()
+ .to_owned()
+ .into(),
+ ),
DataType::Struct(fields) => ScalarValue::Struct(None, fields.clone()),
DataType::Null => ScalarValue::Null,
_ => {
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 0a61fce15482..d21bd464f850 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -25,7 +25,9 @@ use arrow::compute;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
-use arrow_array::{Array, LargeListArray, ListArray, RecordBatchOptions};
+use arrow_array::{
+ Array, FixedSizeListArray, LargeListArray, ListArray, RecordBatchOptions,
+};
use arrow_schema::DataType;
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
@@ -368,6 +370,19 @@ pub fn array_into_large_list_array(arr: ArrayRef) -> LargeListArray {
)
}
+pub fn array_into_fixed_size_list_array(
+ arr: ArrayRef,
+ list_size: usize,
+) -> FixedSizeListArray {
+ let list_size = list_size as i32;
+ FixedSizeListArray::new(
+ Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
+ list_size,
+ arr,
+ None,
+ )
+}
+
/// Wrap arrays into a single element `ListArray`.
///
/// Example:
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index f5496d4c4700..69b18a326951 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -62,11 +62,11 @@ bytes = { workspace = true }
bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
dashmap = { workspace = true }
-datafusion-common = { path = "../common", version = "34.0.0", features = ["object_store"], default-features = false }
+datafusion-common = { path = "../common", version = "35.0.0", features = ["object_store"], default-features = false }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
-datafusion-optimizer = { path = "../optimizer", version = "34.0.0", default-features = false }
-datafusion-physical-expr = { path = "../physical-expr", version = "34.0.0", default-features = false }
+datafusion-optimizer = { path = "../optimizer", version = "35.0.0", default-features = false }
+datafusion-physical-expr = { path = "../physical-expr", version = "35.0.0", default-features = false }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
flate2 = { version = "1.0.24", optional = true }
diff --git a/datafusion/core/src/catalog/mod.rs b/datafusion/core/src/catalog/mod.rs
index ce27d57da00d..da7e1f5e2193 100644
--- a/datafusion/core/src/catalog/mod.rs
+++ b/datafusion/core/src/catalog/mod.rs
@@ -29,7 +29,10 @@ use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
use std::any::Any;
use std::sync::Arc;
-/// Represent a list of named catalogs
+/// Represent a list of named [`CatalogProvider`]s.
+///
+/// Please see the documentation on `CatalogProvider` for details of
+/// implementing a custom catalog.
pub trait CatalogList: Sync + Send {
/// Returns the catalog list as [`Any`]
/// so that it can be downcast to a specific implementation.
@@ -94,6 +97,88 @@ impl CatalogList for MemoryCatalogList {
}
/// Represents a catalog, comprising a number of named schemas.
+///
+/// # Catalog Overview
+///
+/// To plan and execute queries, DataFusion needs a "Catalog" that provides
+/// metadata such as which schemas and tables exist, their columns and data
+/// types, and how to access the data.
+///
+/// The Catalog API consists:
+/// * [`CatalogList`]: a collection of `CatalogProvider`s
+/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems)
+/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems)
+/// * [`TableProvider]`: individual tables
+///
+/// # Implementing Catalogs
+///
+/// To implement a catalog, you implement at least one of the [`CatalogList`],
+/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them
+/// appropriately the [`SessionContext`].
+///
+/// [`SessionContext`]: crate::execution::context::SessionContext
+///
+/// DataFusion comes with a simple in-memory catalog implementation,
+/// [`MemoryCatalogProvider`], that is used by default and has no persistence.
+/// DataFusion does not include more complex Catalog implementations because
+/// catalog management is a key design choice for most data systems, and thus
+/// it is unlikely that any general-purpose catalog implementation will work
+/// well across many use cases.
+///
+/// # Implementing "Remote" catalogs
+///
+/// Sometimes catalog information is stored remotely and requires a network call
+/// to retrieve. For example, the [Delta Lake] table format stores table
+/// metadata in files on S3 that must be first downloaded to discover what
+/// schemas and tables exist.
+///
+/// [Delta Lake]: https://delta.io/
+///
+/// The [`CatalogProvider`] can support this use case, but it takes some care.
+/// The planning APIs in DataFusion are not `async` and thus network IO can not
+/// be performed "lazily" / "on demand" during query planning. The rationale for
+/// this design is that using remote procedure calls for all catalog accesses
+/// required for query planning would likely result in multiple network calls
+/// per plan, resulting in very poor planning performance.
+///
+/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs,
+/// you need to provide an in memory snapshot of the required metadata. Most
+/// systems typically either already have this information cached locally or can
+/// batch access to the remote catalog to retrieve multiple schemas and tables
+/// in a single network call.
+///
+/// Note that [`SchemaProvider::table`] is an `async` function in order to
+/// simplify implementing simple [`SchemaProvider`]s. For many table formats it
+/// is easy to list all available tables but there is additional non trivial
+/// access required to read table details (e.g. statistics).
+///
+/// The pattern that DataFusion itself uses to plan SQL queries is to walk over
+/// the query to [find all schema / table references in an `async` function],
+/// performing required remote catalog in parallel, and then plans the query
+/// using that snapshot.
+///
+/// [find all schema / table references in an `async` function]: crate::execution::context::SessionState::resolve_table_references
+///
+/// # Example Catalog Implementations
+///
+/// Here are some examples of how to implement custom catalogs:
+///
+/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider
+/// that treats files and directories on a filesystem as tables.
+///
+/// * The [`catalog.rs`]: a simple directory based catalog.
+///
+/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can
+/// read from Delta Lake tables
+///
+/// [`datafusion-cli`]: https://arrow.apache.org/datafusion/user-guide/cli.html
+/// [`DynamicFileCatalogProvider`]: https://github.com/apache/arrow-datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75
+/// [`catalog.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/external_dependency/catalog.rs
+/// [delta-rs]: https://github.com/delta-io/delta-rs
+/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123
+///
+/// [`TableProvider]: crate::datasource::TableProvider
+
pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`]
/// so that it can be downcast to a specific implementation.
diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index 1bb2df914ab2..2cebad717249 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -28,20 +28,28 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
/// Represents a schema, comprising a number of named tables.
+///
+/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
+///
+/// [`CatalogProvider`]: super::CatalogProvider
#[async_trait]
pub trait SchemaProvider: Sync + Send {
- /// Returns the schema provider as [`Any`](std::any::Any)
- /// so that it can be downcast to a specific implementation.
+ /// Returns this `SchemaProvider` as [`Any`] so that it can be downcast to a
+ /// specific implementation.
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec;
- /// Retrieves a specific table from the schema by name, provided it exists.
+ /// Retrieves a specific table from the schema by name, if it exists,
+ /// otherwise returns `None`.
async fn table(&self, name: &str) -> Option>;
- /// If supported by the implementation, adds a new table to this schema.
- /// If a table of the same name existed before, it returns "Table already exists" error.
+ /// If supported by the implementation, adds a new table named `name` to
+ /// this schema.
+ ///
+ /// If a table of the same name was already registered, returns "Table
+ /// already exists" error.
#[allow(unused_variables)]
fn register_table(
&self,
@@ -51,16 +59,16 @@ pub trait SchemaProvider: Sync + Send {
exec_err!("schema provider does not support registering tables")
}
- /// If supported by the implementation, removes an existing table from this schema and returns it.
- /// If no table of that name exists, returns Ok(None).
+ /// If supported by the implementation, removes the `name` table from this
+ /// schema and returns the previously registered [`TableProvider`], if any.
+ ///
+ /// If no `name` table exists, returns Ok(None).
#[allow(unused_variables)]
fn deregister_table(&self, name: &str) -> Result