Skip to content

Commit

Permalink
remove bind() logic in from_proto.rs
Browse files Browse the repository at this point in the history
fix broadcast join shuffle read check

fix partitioning matching in rss shuffle writer

reduce agg output memory usage

change protobufs default recursion limit to Int.MaxValue

fix SortAgg bug

fix SortMergeJoin bug

add conf: spark.blaze.partialAggSkipping.skipSpill

add conf: spark.blaze.tokio.worker.threads.per.cpu

use case-insensitive field matching in scan schema adaptor
  • Loading branch information
zhangli20 committed Jan 19, 2025
1 parent e05b838 commit 5d109f6
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 180 deletions.
2 changes: 2 additions & 0 deletions native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ define_conf!(BooleanConf, IGNORE_CORRUPTED_FILES);
define_conf!(BooleanConf, PARTIAL_AGG_SKIPPING_ENABLE);
define_conf!(DoubleConf, PARTIAL_AGG_SKIPPING_RATIO);
define_conf!(IntConf, PARTIAL_AGG_SKIPPING_MIN_ROWS);
define_conf!(BooleanConf, PARTIAL_AGG_SKIPPING_SKIP_SPILL);
define_conf!(BooleanConf, PARQUET_ENABLE_PAGE_FILTERING);
define_conf!(BooleanConf, PARQUET_ENABLE_BLOOM_FILTER);
define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC);
define_conf!(IntConf, TOKIO_WORKER_THREADS_PER_CPU);
define_conf!(IntConf, SPARK_TASK_CPUS);
define_conf!(StringConf, SPILL_COMPRESSION_CODEC);
define_conf!(BooleanConf, SMJ_FALLBACK_ENABLE);
Expand Down
122 changes: 20 additions & 102 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datafusion::{
object_store::ObjectStoreUrl,
physical_plan::FileScanConfig,
},
error::DataFusionError,
logical_expr::{ColumnarValue, Operator, ScalarUDF, Volatility},
physical_expr::{
expressions::{in_list, LikeExpr, SCAndExpr, SCOrExpr},
Expand Down Expand Up @@ -101,31 +100,6 @@ use crate::{
Schema,
};

fn bind(
expr_in: Arc<dyn PhysicalExpr>,
input_schema: &Arc<Schema>,
) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
let expr = expr_in.as_any();

if let Some(expr) = expr.downcast_ref::<Column>() {
if expr.name() == "__bound_reference__" {
Ok(Arc::new(expr.clone()))
} else {
Ok(Arc::new(Column::new_with_schema(
expr.name(),
input_schema,
)?))
}
} else {
let new_children = expr_in
.children()
.iter()
.map(|&child_expr| bind(child_expr.clone(), input_schema))
.collect::<Result<Vec<_>, DataFusionError>>()?;
Ok(expr_in.with_new_children(new_children)?)
}
}

impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
type Error = PlanSerDeError;

Expand All @@ -145,10 +119,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.zip(projection.expr_name.iter())
.map(|(expr, name)| {
Ok((
bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?,
try_parse_physical_expr(expr, &input.schema())?,
name.to_string(),
))
})
Expand All @@ -160,12 +131,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let predicates = filter
.expr
.iter()
.map(|expr| {
Ok(bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?)
})
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
.collect::<Result<_, Self::Error>>()?;
Ok(Arc::new(FilterExec::try_new(predicates, input)?))
}
Expand Down Expand Up @@ -213,11 +179,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map(|col| {
let left_key =
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
let left_key_binded = bind(left_key, &left.schema())?;
let right_key =
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
let right_key_binded = bind(right_key, &right.schema())?;
Ok((left_key_binded, right_key_binded))
Ok((left_key, right_key))
})
.collect::<Result<_, Self::Error>>()?;

Expand Down Expand Up @@ -252,11 +216,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map(|col| {
let left_key =
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
let left_key_binded = bind(left_key, &left.schema())?;
let right_key =
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
let right_key_binded = bind(right_key, &right.schema())?;
Ok((left_key_binded, right_key_binded))
Ok((left_key, right_key))
})
.collect::<Result<_, Self::Error>>()?;

Expand Down Expand Up @@ -347,12 +309,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let keys = bhm
.keys
.iter()
.map(|expr| {
Ok(bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?)
})
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, Self::Error>>()?;
Ok(Arc::new(BroadcastJoinBuildHashMapExec::new(input, keys)))
}
Expand All @@ -366,11 +323,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map(|col| {
let left_key =
try_parse_physical_expr(&col.left.as_ref().unwrap(), &left.schema())?;
let left_key_binded = bind(left_key, &left.schema())?;
let right_key =
try_parse_physical_expr(&col.right.as_ref().unwrap(), &right.schema())?;
let right_key_binded = bind(right_key, &right.schema())?;
Ok((left_key_binded, right_key_binded))
Ok((left_key, right_key))
})
.collect::<Result<_, Self::Error>>()?;

Expand Down Expand Up @@ -448,10 +403,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.zip(agg.grouping_expr_name.iter())
.map(|(expr, name)| {
try_parse_physical_expr(expr, &input_schema).and_then(|expr| {
Ok(bind(expr, &input_schema).map(|expr| GroupingExpr {
Ok(GroupingExpr {
expr,
field_name: name.to_owned(),
})?)
})
})
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down Expand Up @@ -480,10 +435,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let agg_children_exprs = agg_node
.children
.iter()
.map(|expr| {
try_parse_physical_expr(expr, &input_schema)
.and_then(|expr| Ok(bind(expr, &input_schema)?))
})
.map(|expr| try_parse_physical_expr(expr, &input_schema))
.collect::<Result<Vec<_>, _>>()?;

Ok(AggExpr {
Expand Down Expand Up @@ -532,12 +484,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
projection
.expr
.iter()
.map(|expr| {
Ok(bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?)
})
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
.collect::<Result<Vec<_>, Self::Error>>()
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down Expand Up @@ -565,12 +512,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let children = w
.children
.iter()
.map(|expr| {
Ok(bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?)
})
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
.collect::<Result<Vec<_>, Self::Error>>()?;

let window_func = match w.func_type() {
Expand Down Expand Up @@ -623,12 +565,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let partition_specs = window
.partition_spec
.iter()
.map(|expr| {
Ok(bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?)
})
.map(|expr| try_parse_physical_expr(expr, &input.schema()))
.collect::<Result<Vec<_>, Self::Error>>()?;

let order_specs = window
Expand All @@ -637,8 +574,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map(|expr| {
let expr = expr.expr_type.as_ref().ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected expr {:?}",
self
"physical_plan::from_proto() Unexpected expr {self:?}",
))
})?;
if let protobuf::physical_expr_node::ExprType::Sort(sort_expr) = expr {
Expand All @@ -653,10 +589,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?,
expr: try_parse_physical_expr(expr, &input.schema())?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
Expand Down Expand Up @@ -688,12 +621,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {

let children = pb_generator_children
.iter()
.map(|expr| {
Ok::<_, PlanSerDeError>(bind(
try_parse_physical_expr(expr, &input_schema)?,
&input_schema,
)?)
})
.map(|expr| try_parse_physical_expr(expr, &input_schema))
.collect::<Result<Vec<_>, _>>()?;

let generator = match pb_generate_func {
Expand Down Expand Up @@ -856,10 +784,7 @@ fn try_parse_physical_expr(

let pexpr: Arc<dyn PhysicalExpr> =
match expr_type {
ExprType::Column(c) => {
let pcol: Column = c.into();
Arc::new(pcol)
}
ExprType::Column(c) => Arc::new(Column::new(&c.name, input_schema.index_of(&c.name)?)),
ExprType::Literal(scalar) => Arc::new(Literal::new(convert_required!(scalar.value)?)),
ExprType::BoundReference(bound_reference) => {
let pcol: Column = bound_reference.into();
Expand Down Expand Up @@ -894,11 +819,10 @@ fn try_parse_physical_expr(
try_parse_physical_expr_box_required(&e.expr, input_schema)?,
)),
ExprType::InList(e) => {
let expr = try_parse_physical_expr_box_required(&e.expr, input_schema)
.and_then(|expr| Ok(bind(expr, input_schema)?))?; // materialize expr.data_type
let expr = try_parse_physical_expr_box_required(&e.expr, input_schema)?;
let dt = expr.data_type(input_schema)?;
in_list(
bind(expr, input_schema)?,
expr,
e.list
.iter()
.map(|x| {
Expand Down Expand Up @@ -1111,10 +1035,7 @@ fn try_parse_physical_sort_expr(
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?,
expr: try_parse_physical_expr(expr, &input.schema())?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
Expand Down Expand Up @@ -1150,10 +1071,7 @@ pub fn parse_protobuf_partitioning(
let expr = hash_part
.hash_expr
.iter()
.map(|e| {
try_parse_physical_expr(e, &input.schema())
.and_then(|e| Ok(bind(e, &input.schema())?))
})
.map(|e| try_parse_physical_expr(e, &input.schema()))
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
Ok(Some(Partitioning::HashPartitioning(
expr,
Expand Down
19 changes: 14 additions & 5 deletions native-engine/blaze/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::{
record_batch::RecordBatch,
};
use blaze_jni_bridge::{
conf::{IntConf, SPARK_TASK_CPUS},
conf::{IntConf, SPARK_TASK_CPUS, TOKIO_WORKER_THREADS_PER_CPU},
is_task_running,
jni_bridge::JavaClasses,
jni_call, jni_call_static, jni_convert_byte_array, jni_exception_check, jni_exception_occurred,
Expand Down Expand Up @@ -95,13 +95,19 @@ impl NativeExecutionRuntime {
&ExecutionPlanMetricsSet::new(),
);

let num_worker_threads = {
let worker_threads_per_cpu = TOKIO_WORKER_THREADS_PER_CPU.value().unwrap_or(0);
let spark_task_cpus = SPARK_TASK_CPUS.value().unwrap_or(0);
worker_threads_per_cpu * spark_task_cpus
};

// create tokio runtime
// propagate classloader and task context to spawned children threads
let spark_task_context = jni_call_static!(JniBridge.getTaskContext() -> JObject)?;
let spark_task_context_global = jni_new_global_ref!(spark_task_context.as_obj())?;
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
let mut tokio_runtime_builder = tokio::runtime::Builder::new_multi_thread();
tokio_runtime_builder
.thread_name(format!("blaze-native-stage-{stage_id}-part-{partition_id}"))
.worker_threads(SPARK_TASK_CPUS.value().unwrap_or(1) as usize)
.on_thread_start(move || {
let classloader = JavaClasses::get().classloader;
let _ = jni_call_static!(
Expand All @@ -112,8 +118,11 @@ impl NativeExecutionRuntime {
);
THREAD_STAGE_ID.set(stage_id);
THREAD_PARTITION_ID.set(partition_id);
})
.build()?;
});
if num_worker_threads > 0 {
tokio_runtime_builder.worker_threads(num_worker_threads as usize);
}
let tokio_runtime = tokio_runtime_builder.build()?;

// spawn batch producer
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
Expand Down
24 changes: 15 additions & 9 deletions native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::{
};
use blaze_jni_bridge::{
conf,
conf::{DoubleConf, IntConf},
conf::{BooleanConf, DoubleConf, IntConf},
};
use datafusion::{
common::{cast::as_binary_array, Result},
Expand Down Expand Up @@ -60,6 +60,7 @@ pub struct AggContext {
pub supports_partial_skipping: bool,
pub partial_skipping_ratio: f64,
pub partial_skipping_min_rows: usize,
pub partial_skipping_skip_spill: bool,
pub is_expand_agg: bool,
pub agg_expr_evaluator: CachedExprsEvaluator,
}
Expand Down Expand Up @@ -162,14 +163,18 @@ impl AggContext {
agg_expr_evaluator_output_schema,
)?;

let (partial_skipping_ratio, partial_skipping_min_rows) = if supports_partial_skipping {
(
conf::PARTIAL_AGG_SKIPPING_RATIO.value().unwrap_or(0.999),
conf::PARTIAL_AGG_SKIPPING_MIN_ROWS.value().unwrap_or(20000) as usize,
)
} else {
Default::default()
};
let (partial_skipping_ratio, partial_skipping_min_rows, partial_skipping_skip_spill) =
if supports_partial_skipping {
(
conf::PARTIAL_AGG_SKIPPING_RATIO.value().unwrap_or(0.999),
conf::PARTIAL_AGG_SKIPPING_MIN_ROWS.value().unwrap_or(20000) as usize,
conf::PARTIAL_AGG_SKIPPING_SKIP_SPILL
.value()
.unwrap_or(false),
)
} else {
Default::default()
};

Ok(Self {
exec_mode,
Expand All @@ -186,6 +191,7 @@ impl AggContext {
supports_partial_skipping,
partial_skipping_ratio,
partial_skipping_min_rows,
partial_skipping_skip_spill,
is_expand_agg,
})
}
Expand Down
Loading

0 comments on commit 5d109f6

Please sign in to comment.