Skip to content

Commit

Permalink
Update substrait requirement from 0.49 to 0.50 (#13808)
Browse files Browse the repository at this point in the history
* Update substrait requirement from 0.49 to 0.50

Updates the requirements on [substrait](https://github.com/substrait-io/substrait-rs) to permit the latest version.
- [Release notes](https://github.com/substrait-io/substrait-rs/releases)
- [Changelog](https://github.com/substrait-io/substrait-rs/blob/main/CHANGELOG.md)
- [Commits](substrait-io/substrait-rs@v0.49.0...v0.50.0)

---
updated-dependencies:
- dependency-name: substrait
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <[email protected]>

* Fix compilation

* Add expr test

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: jonahgao <[email protected]>
  • Loading branch information
dependabot[bot] and jonahgao authored Dec 19, 2024
1 parent 71f157f commit 9f530dd
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 27 deletions.
2 changes: 1 addition & 1 deletion datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object_store = { workspace = true }
pbjson-types = "0.7"
# TODO use workspace version
prost = "0.13"
substrait = { version = "0.49", features = ["serde"] }
substrait = { version = "0.50", features = ["serde"] }
url = { workspace = true }

[dev-dependencies]
Expand Down
31 changes: 22 additions & 9 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use datafusion::logical_expr::{
col, expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, Partitioning,
Repartition, Subquery, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
};
use datafusion::prelude::JoinType;
use datafusion::prelude::{lit, JoinType};
use datafusion::sql::TableReference;
use datafusion::{
error::Result, logical_expr::utils::split_conjunction, prelude::Column,
Expand Down Expand Up @@ -98,7 +98,7 @@ use substrait::proto::{
sort_field::{SortDirection, SortKind::*},
AggregateFunction, Expression, NamedStruct, Plan, Rel, RelCommon, Type,
};
use substrait::proto::{ExtendedExpression, FunctionArgument, SortField};
use substrait::proto::{fetch_rel, ExtendedExpression, FunctionArgument, SortField};

use super::state::SubstraitPlanningState;

Expand Down Expand Up @@ -640,14 +640,27 @@ pub async fn from_substrait_rel(
let input = LogicalPlanBuilder::from(
from_substrait_rel(state, input, extensions).await?,
);
let offset = fetch.offset as usize;
// -1 means that ALL records should be returned
let count = if fetch.count == -1 {
None
} else {
Some(fetch.count as usize)
let empty_schema = DFSchemaRef::new(DFSchema::empty());
let offset = match &fetch.offset_mode {
Some(fetch_rel::OffsetMode::Offset(offset)) => Some(lit(*offset)),
Some(fetch_rel::OffsetMode::OffsetExpr(expr)) => Some(
from_substrait_rex(state, expr, &empty_schema, extensions)
.await?,
),
None => None,
};
let count = match &fetch.count_mode {
Some(fetch_rel::CountMode::Count(count)) => {
// -1 means that ALL records should be returned, equivalent to None
(*count != -1).then(|| lit(*count))
}
Some(fetch_rel::CountMode::CountExpr(expr)) => Some(
from_substrait_rex(state, expr, &empty_schema, extensions)
.await?,
),
None => None,
};
input.limit(offset, count)?.build()
input.limit_by_expr(offset, count)?.build()
} else {
not_impl_err!("Fetch without an input is not valid")
}
Expand Down
39 changes: 25 additions & 14 deletions datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use std::sync::Arc;
use substrait::proto::expression_reference::ExprType;

use datafusion::arrow::datatypes::{Field, IntervalUnit};
use datafusion::logical_expr::{
Distinct, FetchType, Like, Partitioning, SkipType, TryCast, WindowFrameUnits,
};
use datafusion::logical_expr::{Distinct, Like, Partitioning, TryCast, WindowFrameUnits};
use datafusion::{
arrow::datatypes::{DataType, TimeUnit},
error::{DataFusionError, Result},
Expand All @@ -45,7 +43,7 @@ use datafusion::arrow::array::{Array, GenericListArray, OffsetSizeTrait};
use datafusion::arrow::temporal_conversions::NANOSECONDS;
use datafusion::common::{
exec_err, internal_err, not_impl_err, plan_err, substrait_datafusion_err,
substrait_err, DFSchemaRef, ToDFSchema,
substrait_err, DFSchema, DFSchemaRef, ToDFSchema,
};
#[allow(unused_imports)]
use datafusion::logical_expr::expr::{
Expand All @@ -69,7 +67,8 @@ use substrait::proto::read_rel::VirtualTable;
use substrait::proto::rel_common::EmitKind;
use substrait::proto::rel_common::EmitKind::Emit;
use substrait::proto::{
rel_common, ExchangeRel, ExpressionReference, ExtendedExpression, RelCommon,
fetch_rel, rel_common, ExchangeRel, ExpressionReference, ExtendedExpression,
RelCommon,
};
use substrait::{
proto::{
Expand Down Expand Up @@ -333,19 +332,31 @@ pub fn to_substrait_rel(
}
LogicalPlan::Limit(limit) => {
let input = to_substrait_rel(limit.input.as_ref(), state, extensions)?;
let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
return not_impl_err!("Non-literal limit fetch");
};
let SkipType::Literal(skip) = limit.get_skip_type()? else {
return not_impl_err!("Non-literal limit skip");
};
let empty_schema = Arc::new(DFSchema::empty());
let offset_mode = limit
.skip
.as_ref()
.map(|expr| {
to_substrait_rex(state, expr.as_ref(), &empty_schema, 0, extensions)
})
.transpose()?
.map(Box::new)
.map(fetch_rel::OffsetMode::OffsetExpr);
let count_mode = limit
.fetch
.as_ref()
.map(|expr| {
to_substrait_rex(state, expr.as_ref(), &empty_schema, 0, extensions)
})
.transpose()?
.map(Box::new)
.map(fetch_rel::CountMode::CountExpr);
Ok(Box::new(Rel {
rel_type: Some(RelType::Fetch(Box::new(FetchRel {
common: None,
input: Some(input),
offset: skip as i64,
// use -1 to signal that ALL records should be returned
count: fetch.map(|f| f as i64).unwrap_or(-1),
offset_mode,
count_mode,
advanced_extension: None,
}))),
}))
Expand Down
9 changes: 6 additions & 3 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,20 @@ async fn select_with_filter_bool_expr() -> Result<()> {

#[tokio::test]
async fn select_with_limit() -> Result<()> {
roundtrip_fill_na("SELECT * FROM data LIMIT 100").await
roundtrip_fill_na("SELECT * FROM data LIMIT 100").await?;
roundtrip_fill_na("SELECT * FROM data LIMIT 98+100/50").await
}

#[tokio::test]
async fn select_without_limit() -> Result<()> {
roundtrip_fill_na("SELECT * FROM data OFFSET 10").await
roundtrip_fill_na("SELECT * FROM data OFFSET 10").await?;
roundtrip_fill_na("SELECT * FROM data OFFSET 5+7-2").await
}

#[tokio::test]
async fn select_with_limit_offset() -> Result<()> {
roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await
roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await?;
roundtrip("SELECT * FROM data LIMIT 100+100 OFFSET 20/2").await
}

#[tokio::test]
Expand Down

0 comments on commit 9f530dd

Please sign in to comment.