Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove parallel unit mapping from frontend #16588

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
decfccf
Add WorkerMapping to protos; refactor to worker ID usage.
shanicky Apr 7, 2024
5947afb
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-2
shanicky Apr 16, 2024
724d037
Refactor observer, notification, & catalog files for better structure…
shanicky Apr 16, 2024
0916ac8
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky Apr 17, 2024
8f7f9b3
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky Apr 18, 2024
e40d0c8
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky Apr 22, 2024
b4673e6
Refactor parallel unit-worker map method to utils module.
shanicky Apr 22, 2024
b50031c
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky Apr 22, 2024
3b48ac9
Deprecate fields in MetaSnapshot & reserve tags.
shanicky Apr 22, 2024
36937ac
Merge branch 'main' into peng/fe-remove-pu
shanicky Apr 24, 2024
6125cab
update proto
shanicky Apr 24, 2024
0c8b540
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky May 6, 2024
ff4e3fa
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky May 9, 2024
ca47e5b
Fixed typos, clean up imports and whitespace in Rust files
shanicky May 11, 2024
f107855
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky May 11, 2024
b6e2700
Refactor serving.rs for efficient worker-to-vnode mapping, update imp…
shanicky May 11, 2024
e3de9ff
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
shanicky May 13, 2024
78622ad
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
May 14, 2024
bc2b8c5
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu
May 16, 2024
9b92280
tmp
May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added metadata.sqlite
Empty file.
6 changes: 6 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ message ParallelUnitMapping {
repeated uint32 data = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to worker id.
message WorkerMapping {
repeated uint32 original_indices = 1;
repeated uint32 data = 2;
}

message BatchQueryEpoch {
oneof epoch {
uint64 committed = 1;
Expand Down
39 changes: 30 additions & 9 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ message FragmentParallelUnitMappings {
repeated FragmentParallelUnitMapping mappings = 1;
}

/// Worker mapping with fragment id, used for notification.
message FragmentWorkerMapping {
uint32 fragment_id = 1;
common.WorkerMapping mapping = 2;
}

message FragmentWorkerMappings {
repeated FragmentWorkerMapping mappings = 1;
}

// TODO: remove this when dashboard refactored.
message ActorLocation {
common.WorkerNode node = 1;
Expand Down Expand Up @@ -378,8 +388,10 @@ message SubscribeRequest {
message MetaSnapshot {
message SnapshotVersion {
uint64 catalog_version = 1;
uint64 parallel_unit_mapping_version = 2;
reserved 2;
reserved "parallel_unit_mapping_version";
uint64 worker_node_version = 3;
uint64 streaming_worker_mapping_version = 4;
}
repeated catalog.Database databases = 1;
repeated catalog.Schema schemas = 2;
Expand All @@ -392,16 +404,20 @@ message MetaSnapshot {
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
reserved 9;
reserved "parallel_unit_mappings";
GetSessionParamsResponse session_params = 20;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
hummock.HummockSnapshot hummock_snapshot = 11;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
hummock.WriteLimits hummock_write_limits = 16;
// for serving
repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18;
reserved 18;
reserved "serving_parallel_unit_mappings";

// for streaming
repeated FragmentWorkerMapping streaming_worker_mappings = 21;
repeated FragmentWorkerMapping serving_worker_mappings = 22;

SnapshotVersion version = 13;
}
Expand Down Expand Up @@ -440,8 +456,6 @@ message SubscribeResponse {
catalog.Function function = 6;
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
// for streaming
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
Expand All @@ -451,10 +465,15 @@ message SubscribeResponse {
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
catalog.Connection connection = 22;
FragmentParallelUnitMappings serving_parallel_unit_mappings = 23;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
FragmentWorkerMapping streaming_worker_mapping = 27;
FragmentWorkerMappings serving_worker_mappings = 28;
}
reserved 12;
reserved "parallel_unit_mapping";
reserved 23;
reserved "serving_parallel_unit_mappings";
}

service NotificationService {
Expand Down Expand Up @@ -629,8 +648,10 @@ service SessionParamService {
message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
repeated FragmentParallelUnitMapping mappings = 1;
reserved 1;
reserved "mappings";
map<uint32, uint32> fragment_to_table = 2;
repeated FragmentWorkerMapping worker_mappings = 3;
}

service ServingService {
Expand Down
39 changes: 23 additions & 16 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode,
ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::tracing::TracingContext;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
Expand All @@ -52,7 +51,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
struct InnerSideExecutorBuilder<C> {
table_desc: StorageTableDesc,
table_distribution: TableDistribution,
vnode_mapping: ExpandedParallelUnitMapping,
vnode_mapping: ExpandedWorkerMapping,
outer_side_key_types: Vec<DataType>,
inner_side_schema: Schema,
inner_side_column_ids: Vec<i32>,
Expand All @@ -61,8 +60,8 @@ struct InnerSideExecutorBuilder<C> {
context: C,
task_id: TaskId,
epoch: BatchQueryEpoch,
pu_to_worker_mapping: HashMap<ParallelUnitId, WorkerNode>,
pu_to_scan_range_mapping: HashMap<ParallelUnitId, Vec<(ScanRange, VirtualNode)>>,
worker_mapping: HashMap<WorkerId, WorkerNode>,
worker_to_scan_range_mapping: HashMap<WorkerId, Vec<(ScanRange, VirtualNode)>>,
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
Expand Down Expand Up @@ -92,7 +91,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
/// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
/// based on the passed `scan_range` and virtual node.
fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result<NodeBody> {
let list = self.pu_to_scan_range_mapping.get(id).unwrap();
let list = self.worker_to_scan_range_mapping.get(id).unwrap();
let mut scan_ranges = vec![];
let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());

Expand All @@ -114,11 +113,11 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
}

/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<PbExchangeSource> {
fn build_prost_exchange_source(&self, id: &WorkerId) -> Result<PbExchangeSource> {
let worker = self
.pu_to_worker_mapping
.worker_mapping
.get(id)
.context("No worker node found for the given parallel unit id.")?;
.context("No worker node found for the given worker id.")?;

let local_execute_plan = LocalExecutePlan {
plan: Some(PlanFragment {
Expand Down Expand Up @@ -160,7 +159,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
#[async_trait::async_trait]
impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C> {
fn reset(&mut self) {
self.pu_to_scan_range_mapping = HashMap::new();
self.worker_to_scan_range_mapping = HashMap::new();
}

/// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id
Expand Down Expand Up @@ -191,11 +190,11 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let parallel_unit_id = self.vnode_mapping[vnode.to_index()];
let worker_id = self.vnode_mapping[vnode.to_index()];

let list = self
.pu_to_scan_range_mapping
.entry(parallel_unit_id)
.worker_to_scan_range_mapping
.entry(worker_id)
.or_default();
list.push((scan_range, vnode));

Expand All @@ -207,7 +206,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
async fn build_executor(&mut self) -> Result<BoxedExecutor> {
self.next_stage_id += 1;
let mut sources = vec![];
for id in self.pu_to_scan_range_mapping.keys() {
for id in self.worker_to_scan_range_mapping.keys() {
sources.push(self.build_prost_exchange_source(id)?);
}

Expand Down Expand Up @@ -373,6 +372,14 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {

let chunk_size = source.context.get_config().developer.chunk_size;

let worker_nodes = lookup_join_node.get_worker_nodes();
let worker_mapping: HashMap<WorkerId, WorkerNode> = worker_nodes
.iter()
.map(|worker| (worker.id, worker.clone()))
.collect();

assert_eq!(worker_mapping.len(), worker_nodes.len());

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(
Expand All @@ -388,11 +395,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
context: source.context().clone(),
task_id: source.task_id.clone(),
epoch: source.epoch(),
pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()),
pu_to_scan_range_mapping: HashMap::new(),
worker_to_scan_range_mapping: HashMap::new(),
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
worker_mapping,
};

let identity = source.plan_node().get_identity().clone();
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
)
.await?;

println!("plan {:?}", self.plan);

let sender = self.sender.clone();
let _failure = self.failure.clone();
let task_id = self.task_id.clone();
Expand Down
Loading
Loading