Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into dependabot/cargo/main…
Browse files Browse the repository at this point in the history
…/apache-avro-0.17
  • Loading branch information
alamb committed Dec 11, 2024
2 parents 8800b0f + 6196ff2 commit 76ae0c2
Show file tree
Hide file tree
Showing 349 changed files with 5,808 additions and 3,424 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ jobs:
- name: Check datafusion-common without default features
run: cargo check --all-targets --no-default-features -p datafusion-common

- name: Check datafusion-functions
- name: Check datafusion-functions without default features
run: cargo check --all-targets --no-default-features -p datafusion-functions

- name: Check datafusion-substrait without default features
run: cargo check --all-targets --no-default-features -p datafusion-substrait

- name: Check workspace in debug mode
run: cargo check --all-targets --workspace

Expand Down Expand Up @@ -582,9 +585,9 @@ jobs:
#
# To reproduce:
# 1. Install the version of Rust that is failing. Example:
# rustup install 1.79.0
# rustup install 1.80.1
# 2. Run the command that failed with that version. Example:
# cargo +1.79.0 check -p datafusion
# cargo +1.80.1 check -p datafusion
#
# To resolve, either:
# 1. Change your code to use older Rust features,
Expand All @@ -603,4 +606,4 @@ jobs:
run: cargo msrv --output-format json --log-target stdout verify
- name: Check datafusion-cli
working-directory: datafusion-cli
run: cargo msrv --output-format json --log-target stdout verify
run: cargo msrv --output-format json --log-target stdout verify
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ datafusion/sqllogictest/test_files/scratch*
# temp file for core
datafusion/core/*.parquet

# Generated core benchmark data
datafusion/core/benches/data/*

# rat
filtered_rat.txt
rat.txt
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ members = [
"datafusion/functions",
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
"datafusion/functions-table",
"datafusion/functions-nested",
"datafusion/functions-window",
"datafusion/functions-window-common",
Expand Down Expand Up @@ -64,7 +65,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.79"
rust-version = "1.80.1"
version = "43.0.0"

[workspace.dependencies]
Expand Down Expand Up @@ -110,6 +111,7 @@ datafusion-functions = { path = "datafusion/functions", version = "43.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "43.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "43.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "43.0.0" }
datafusion-functions-table = { path = "datafusion/functions-table", version = "43.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "43.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "43.0.0" }
datafusion-macros = { path = "datafusion/macros", version = "43.0.0" }
Expand All @@ -129,7 +131,6 @@ hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.13"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.11.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "53.3.0", default-features = false, features = [
Expand Down
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
[Chat](https://discord.com/channels/885562378132000778/885562378132000781)

<a href="https://datafusion.apache.org/">
<img src="./docs/source/_static/images/2x_bgwhite_original.png" width="512" alt="logo"/>
<img src="https://github.com/apache/datafusion/raw/HEAD/docs/source/_static/images/2x_bgwhite_original.png" width="512" alt="logo"/>
</a>

DataFusion is an extensible query engine written in [Rust] that
Expand Down Expand Up @@ -126,14 +126,17 @@ Optional features:

## Rust Version Compatibility Policy

DataFusion's Minimum Required Stable Rust Version (MSRV) policy is to support stable [4 latest
Rust versions](https://releases.rs) OR the stable minor Rust version as of 4 months, whichever is lower.
The Rust toolchain releases are tracked at [Rust Versions](https://releases.rs) and follow
[semantic versioning](https://semver.org/). A Rust toolchain release can be identified
by a version string like `1.80.0`, or more generally `major.minor.patch`.

DataFusion's supports the last 4 stable Rust minor versions released and any such versions released within the last 4 months.

For example, given the releases `1.78.0`, `1.79.0`, `1.80.0`, `1.80.1` and `1.81.0` DataFusion will support 1.78.0, which is 3 minor versions prior to the most minor recent `1.81`.

If a hotfix is released for the minimum supported Rust version (MSRV), the MSRV will be the minor version with all hotfixes, even if it surpasses the four-month window.
Note: If a Rust hotfix is released for the current MSRV, the MSRV will be updated to the specific minor version that includes all applicable hotfixes preceding other policies.

We enforce this policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)
DataFusion enforces MSRV policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)

## DataFusion API evolution policy

Expand Down
1 change: 0 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = { workspace = true }
parquet = { workspace = true, default-features = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { workspace = true }
Expand Down
40 changes: 22 additions & 18 deletions benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::LazyLock;
use structopt::StructOpt;

use arrow::record_batch::RecordBatch;
Expand All @@ -33,12 +33,14 @@ use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::memory_pool::{human_readable_size, units};
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt};
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{exec_datafusion_err, exec_err, DEFAULT_PARQUET_EXTENSION};

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -89,7 +91,13 @@ struct QueryResult {
/// Memory limits to run: 64MiB, 32MiB, 16MiB
/// Q2 requires 250MiB for aggregation
/// Memory limits to run: 512MiB, 256MiB, 128MiB, 64MiB, 32MiB
static QUERY_MEMORY_LIMITS: OnceLock<HashMap<usize, Vec<u64>>> = OnceLock::new();
static QUERY_MEMORY_LIMITS: LazyLock<HashMap<usize, Vec<u64>>> = LazyLock::new(|| {
use units::*;
let mut map = HashMap::new();
map.insert(1, vec![64 * MB, 32 * MB, 16 * MB]);
map.insert(2, vec![512 * MB, 256 * MB, 128 * MB, 64 * MB, 32 * MB]);
map
});

impl ExternalAggrConfig {
const AGGR_TABLES: [&'static str; 1] = ["lineitem"];
Expand All @@ -112,16 +120,6 @@ impl ExternalAggrConfig {
"#,
];

fn init_query_memory_limits() -> &'static HashMap<usize, Vec<u64>> {
use units::*;
QUERY_MEMORY_LIMITS.get_or_init(|| {
let mut map = HashMap::new();
map.insert(1, vec![64 * MB, 32 * MB, 16 * MB]);
map.insert(2, vec![512 * MB, 256 * MB, 128 * MB, 64 * MB, 32 * MB]);
map
})
}

/// If `--query` and `--memory-limit` is not speicified, run all queries
/// with pre-configured memory limits
/// If only `--query` is specified, run the query with all memory limits
Expand Down Expand Up @@ -159,8 +157,7 @@ impl ExternalAggrConfig {
query_executions.push((query_id, limit));
}
None => {
let memory_limits_table = Self::init_query_memory_limits();
let memory_limits = memory_limits_table.get(&query_id).unwrap();
let memory_limits = QUERY_MEMORY_LIMITS.get(&query_id).unwrap();
for limit in memory_limits {
query_executions.push((query_id, *limit));
}
Expand Down Expand Up @@ -194,10 +191,15 @@ impl ExternalAggrConfig {
let query_name =
format!("Q{query_id}({})", human_readable_size(mem_limit as usize));
let config = self.common.config();
let runtime_config = RuntimeConfig::new()
let runtime_env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize)))
.build_arc()?;
let ctx = SessionContext::new_with_config_rt(config, runtime_config);
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime_env)
.with_default_features()
.build();
let ctx = SessionContext::from(state);

// register tables
self.register_tables(&ctx).await?;
Expand Down Expand Up @@ -325,7 +327,9 @@ impl ExternalAggrConfig {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common
.partitions
.unwrap_or(get_available_parallelism())
}

/// Parse memory limit from string to number of bytes
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::datasource::MemTable;
use datafusion::prelude::CsvReadOptions;
use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
use datafusion_benchmarks::util::BenchmarkRun;
use datafusion_common::utils::get_available_parallelism;
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
Expand Down Expand Up @@ -91,7 +92,7 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
.with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default())))
.with_schema(Arc::new(schema));
let csv = ListingTable::try_new(listing_config)?;
let partition_size = num_cpus::get();
let partition_size = get_available_parallelism();
let memtable =
MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?;
ctx.register_table("x", Arc::new(memtable))?;
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use log::info;
Expand Down Expand Up @@ -468,7 +469,9 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}

Expand Down
6 changes: 4 additions & 2 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::parquet::TestParquetFile;
use datafusion_common::instant::Instant;

use datafusion_common::utils::get_available_parallelism;
use structopt::StructOpt;

/// Test performance of sorting large datasets
Expand Down Expand Up @@ -147,7 +147,9 @@ impl RunOpt {
rundata.start_new_case(title);
for i in 0..self.common.iterations {
let config = SessionConfig::new().with_target_partitions(
self.common.partitions.unwrap_or(num_cpus::get()),
self.common
.partitions
.unwrap_or(get_available_parallelism()),
);
let ctx = SessionContext::new_with_config(config);
let (rows, elapsed) =
Expand Down
15 changes: 10 additions & 5 deletions benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::DEFAULT_PARQUET_EXTENSION;

use crate::util::{BenchmarkRun, CommonOpt};
Expand Down Expand Up @@ -187,9 +188,11 @@ impl RunOpt {
/// Benchmark query `query_id` in `SORT_QUERIES`
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self.common.config();

let runtime_config = RuntimeConfig::new().build_arc()?;
let ctx = SessionContext::new_with_config_rt(config, runtime_config);
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
let ctx = SessionContext::from(state);

// register tables
self.register_tables(&ctx).await?;
Expand Down Expand Up @@ -315,6 +318,8 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}
5 changes: 4 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use log::info;
Expand Down Expand Up @@ -296,7 +297,9 @@ impl RunOpt {
}

fn partitions(&self) -> usize {
self.common.partitions.unwrap_or(num_cpus::get())
self.common
.partitions
.unwrap_or(get_available_parallelism())
}
}

Expand Down
5 changes: 4 additions & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use datafusion::prelude::SessionConfig;
use datafusion_common::utils::get_available_parallelism;
use structopt::StructOpt;

// Common benchmark options (don't use doc comments otherwise this doc
Expand Down Expand Up @@ -48,7 +49,9 @@ impl CommonOpt {
/// Modify the existing config appropriately
pub fn update_config(&self, config: SessionConfig) -> SessionConfig {
config
.with_target_partitions(self.partitions.unwrap_or(num_cpus::get()))
.with_target_partitions(
self.partitions.unwrap_or(get_available_parallelism()),
)
.with_batch_size(self.batch_size)
}
}
3 changes: 2 additions & 1 deletion benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use datafusion::{error::Result, DATAFUSION_VERSION};
use datafusion_common::utils::get_available_parallelism;
use serde::{Serialize, Serializer};
use serde_json::Value;
use std::{
Expand Down Expand Up @@ -68,7 +69,7 @@ impl RunContext {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
num_cpus: num_cpus::get(),
num_cpus: get_available_parallelism(),
start_time: SystemTime::now(),
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
}
Expand Down
Loading

0 comments on commit 76ae0c2

Please sign in to comment.