Skip to content

Commit

Permalink
feat(batch): add batch_expr_strict_mode to ignore expression error …
Browse files Browse the repository at this point in the history
…in batch query (#19562)
  • Loading branch information
fuyufjh authored Nov 27, 2024
1 parent d17b402 commit ba76431
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 7 deletions.
38 changes: 38 additions & 0 deletions e2e_test/batch/basic/strict_mode.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (v int);

statement ok
insert into t values(-1), (0), (1);

statement ok
SET batch_expr_strict_mode = false;

query I
SELECT 1/v FROM unnest(ARRAY[-1, 0, 1]) v;
----
-1
NULL
1

# This plan consists of a BatchExchange.
query I
SELECT 1/v FROM t order by v;
----
-1
NULL
1

statement ok
SET batch_expr_strict_mode = DEFAULT;

statement error Division by zero
SELECT 1/v FROM unnest(ARRAY[-1, 0, 1]) v;

statement error Division by zero
SELECT 1/v FROM t order by v;

statement ok
drop table t;
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ user background_ddl
user batch_enable_distributed_dml
user batch_enable_lookup_join
user batch_enable_sort_agg
user batch_expr_strict_mode
user batch_parallelism
user bypass_cluster_limits
user bytea_output
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ message Cardinality {
// Provide statement-local context, e.g. session info like time zone, for execution.
message ExprContext {
string time_zone = 1;
bool strict_mode = 2;
}

message AdditionalColumnKey {}
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::array::ArrayImpl::Bool;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_expr::expr::{build_batch_expr_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -90,7 +90,7 @@ impl BoxedExecutorBuilder for FilterExecutor {
)?;

let expr_node = filter_node.get_search_condition()?;
let expr = build_from_prost(expr_node)?;
let expr = build_batch_expr_from_prost(expr_node)?;
Ok(Box::new(Self::new(
expr,
input,
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::{Stream, StreamExt};
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_expr::expr::{build_from_prost, BoxedExpression, Expression};
use risingwave_expr::expr::{build_batch_expr_from_prost, BoxedExpression, Expression};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -89,7 +89,7 @@ impl BoxedExecutorBuilder for ProjectExecutor {
let project_exprs: Vec<_> = project_node
.get_select_list()
.iter()
.map(build_from_prost)
.map(build_batch_expr_from_prost)
.try_collect()?;

let fields = project_exprs
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl BatchManager {
TracingContext::none(),
ExprContext {
time_zone: "UTC".to_string(),
strict_mode: false,
},
)
.await
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ pub struct SessionConfig {
#[parameter(default = false, rename = "batch_enable_distributed_dml")]
batch_enable_distributed_dml: bool,

/// Evaluate expression in strict mode for batch queries.
/// If set to false, an expression failure will not cause an error but leave a null value
/// on the result set.
#[parameter(default = true)]
batch_expr_strict_mode: bool,

/// The max gap allowed to transform small range scan into multi point lookup.
#[parameter(default = 8)]
max_split_range_gap: i32,
Expand Down
17 changes: 17 additions & 0 deletions src/expr/core/src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::iter::Peekable;

use itertools::Itertools;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_expr::expr::LogReport;
use risingwave_pb::expr::expr_node::{PbType, RexNode};
use risingwave_pb::expr::ExprNode;

Expand All @@ -29,6 +30,7 @@ use super::NonStrictExpression;
use crate::expr::{
BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
};
use crate::expr_context::strict_mode;
use crate::sig::FUNCTION_REGISTRY;
use crate::{bail, Result};

Expand All @@ -48,6 +50,21 @@ pub fn build_non_strict_from_prost(
.map(NonStrictExpression)
}

/// Build a strict or non-strict expression according to expr context.
///
/// When strict mode is off, the expression will not fail but leave a null value as result.
///
/// Unlike [`build_non_strict_from_prost`], the returning value here can be either non-strict or
/// strict. Thus, the caller is supposed to handle potential errors under strict mode.
pub fn build_batch_expr_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
if strict_mode()? {
build_from_prost(prost)
} else {
// TODO(eric): report errors to users via psql notice
Ok(ExprBuilder::new_non_strict(LogReport).build(prost)?.boxed())
}
}

/// Build an expression from protobuf with possibly some wrappers attached to each node.
struct ExprBuilder<R> {
/// The error reporting for non-strict mode.
Expand Down
23 changes: 21 additions & 2 deletions src/expr/core/src/expr_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ define_context! {
pub TIME_ZONE: String,
pub FRAGMENT_ID: u32,
pub VNODE_COUNT: usize,
pub STRICT_MODE: bool,
}

pub fn capture_expr_context() -> ExprResult<ExprContext> {
let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?;
Ok(ExprContext { time_zone })
let strict_mode = STRICT_MODE::try_with(|v| *v)?;
Ok(ExprContext {
time_zone,
strict_mode,
})
}

/// Get the vnode count from the context.
Expand All @@ -36,9 +41,23 @@ pub fn vnode_count() -> ExprResult<usize> {
VNODE_COUNT::try_with(|&x| x)
}

/// Get the strict mode from expr context
///
/// The return value depends on session variable. Default is true for batch query.
///
/// Conceptually, streaming always use non-strict mode. Our implementation doesn't read this value,
/// although it's set to false as a placeholder.
pub fn strict_mode() -> ExprResult<bool> {
STRICT_MODE::try_with(|&v| v)
}

pub async fn expr_context_scope<Fut>(expr_context: ExprContext, future: Fut) -> Fut::Output
where
Fut: Future,
{
TIME_ZONE::scope(expr_context.time_zone.to_owned(), future).await
TIME_ZONE::scope(
expr_context.time_zone.to_owned(),
STRICT_MODE::scope(expr_context.strict_mode, future),
)
.await
}
1 change: 1 addition & 0 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ impl StageRunner {
async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> {
let expr_context = ExprContext {
time_zone: self.ctx.session().config().timezone().to_owned(),
strict_mode: self.ctx.session().config().batch_expr_strict_mode(),
};
// If root, we execute it locally.
if !self.is_root_stage() {
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl LocalQueryExecution {
self.batch_query_epoch,
self.shutdown_rx().clone(),
);

let executor = executor.build().await?;

#[for_await]
Expand Down Expand Up @@ -146,6 +147,7 @@ impl LocalQueryExecution {
let db_name = self.session.database().to_string();
let search_path = self.session.config().search_path();
let time_zone = self.session.config().timezone();
let strict_mode = self.session.config().batch_expr_strict_mode();
let timeout = self.timeout;
let meta_client = self.front_env.meta_client_ref();

Expand All @@ -166,7 +168,7 @@ impl LocalQueryExecution {
}
};

use risingwave_expr::expr_context::TIME_ZONE;
use risingwave_expr::expr_context::*;

use crate::expr::function_impl::context::{
AUTH_CONTEXT, CATALOG_READER, DB_NAME, META_CLIENT, SEARCH_PATH, USER_INFO_READER,
Expand All @@ -179,6 +181,7 @@ impl LocalQueryExecution {
let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed();
let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed();
let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed();
let exec = async move { STRICT_MODE::scope(strict_mode, exec).await }.boxed();
let exec = async move { META_CLIENT::scope(meta_client, exec).await }.boxed();

if let Some(timeout) = timeout {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ mod tests {
mview_definition: "".to_string(),
expr_context: Some(PbExprContext {
time_zone: String::from("America/New_York"),
strict_mode: false,
}),
}
})
Expand Down Expand Up @@ -1798,6 +1799,7 @@ mod tests {
.map(VnodeBitmap::from),
expr_context: ExprContext::from(&PbExprContext {
time_zone: String::from("America/New_York"),
strict_mode: false,
}),
}
})
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl StreamContext {
PbExprContext {
// `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
strict_mode: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ async fn test_graph_builder() -> MetaResult<()> {
let graph = make_stream_graph();
let expr_context = ExprContext {
time_zone: graph.ctx.as_ref().unwrap().timezone.clone(),
strict_mode: false,
};
let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?;
let internal_tables = fragment_graph.incomplete_internal_tables();
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::task::barrier_test_utils::LocalBarrierTestEnv;
async fn test_merger_sum_aggr() {
let expr_context = ExprContext {
time_zone: String::from("UTC"),
strict_mode: false,
};

let barrier_test_env = LocalBarrierTestEnv::for_test().await;
Expand Down

0 comments on commit ba76431

Please sign in to comment.