Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanicky Chen committed May 16, 2024
1 parent bc2b8c5 commit 9b92280
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
)
.await?;

println!("plan {:?}", self.plan);

let sender = self.sender.clone();
let _failure = self.failure.clone();
let task_id = self.task_id.clone();
Expand Down
13 changes: 13 additions & 0 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,19 @@ impl WorkerNodeSelector {
}
}

pub fn schedule_unit_count_map(&self) -> HashMap<u32, usize> {
let worker_nodes = if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes()
} else {
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};

worker_nodes
.iter()
.map(|node| (node.id, node.parallel_units.len()))
.collect()
}

pub fn schedule_unit_count(&self) -> usize {
let worker_nodes = if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes()
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,10 +887,19 @@ impl BatchPlanFragmenter {
}
_ => {
if let Some(table_scan_info) = &table_scan_info {
let parallelism_map = self.worker_node_manager.schedule_unit_count_map();
table_scan_info
.partitions
.as_ref()
.map(|m| m.len())
.map(|partitions| {
partitions
.keys()
.map(|worker_id| {
parallelism_map.get(worker_id).cloned().unwrap_or(0)
})
.sum::<usize>()
.max(1)
})
.unwrap_or(1)
} else if let Some(lookup_join_parallelism) =
self.collect_stage_lookup_join_parallelism(root.clone())?
Expand All @@ -904,6 +913,8 @@ impl BatchPlanFragmenter {
}
}
};

println!("parallelism {}", parallelism);
if source_info.is_none() && parallelism == 0 {
return Err(BatchError::EmptyWorkerNodes.into());
}
Expand Down

0 comments on commit 9b92280

Please sign in to comment.