Skip to content

Commit

Permalink
persist table fragments max parallelism
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 27, 2024
1 parent f1286de commit a5898ce
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 15 deletions.
15 changes: 15 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,22 @@ message TableFragments {
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamContext ctx = 6;

TableParallelism parallelism = 7;
// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
//
// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
// the streaming job.
//
// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
// checking the parallelism change with this value can be inaccurate in some cases. However,
// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
//
// Can be unset if the fragment is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
optional uint32 max_parallelism = 10;

// Actors of a materialize view, sink, or table can only be scheduled on nodes with matching node_label.
string node_label = 8;
Expand Down
1 change: 1 addition & 0 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
create_type: Set(CreateType::Foreground),
timezone: Set(table_fragment.timezone()),
parallelism: Set(streaming_parallelism),
max_parallelism: Set(table_fragment.max_parallelism as _),
})
.exec(&meta_store_sql.conn)
.await?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use sea_orm_migration::prelude::{Table as MigrationTable, *};

const VNODE_COUNT: i32 = 256;
macro_rules! col {
($name:expr) => {
ColumnDef::new($name).integer().not_null().default(256) // compat vnode count
};
}

#[derive(DeriveMigrationName)]
pub struct Migration;
Expand All @@ -12,12 +16,7 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(Table::Table)
.add_column(
ColumnDef::new(Table::VnodeCount)
.integer()
.not_null()
.default(VNODE_COUNT),
)
.add_column(col!(Table::VnodeCount))
.to_owned(),
)
.await?;
Expand All @@ -26,12 +25,16 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(Fragment::Table)
.add_column(
ColumnDef::new(Fragment::VnodeCount)
.integer()
.not_null()
.default(VNODE_COUNT),
)
.add_column(col!(Fragment::VnodeCount))
.to_owned(),
)
.await?;

manager
.alter_table(
MigrationTable::alter()
.table(StreamingJob::Table)
.add_column(col!(StreamingJob::MaxParallelism))
.to_owned(),
)
.await
Expand All @@ -54,6 +57,15 @@ impl MigrationTrait for Migration {
.drop_column(Fragment::VnodeCount)
.to_owned(),
)
.await?;

manager
.alter_table(
MigrationTable::alter()
.table(StreamingJob::Table)
.drop_column(StreamingJob::MaxParallelism)
.to_owned(),
)
.await
}
}
Expand All @@ -69,3 +81,9 @@ enum Table {
Table,
VnodeCount,
}

#[derive(DeriveIden)]
enum StreamingJob {
Table,
MaxParallelism,
}
1 change: 1 addition & 0 deletions src/meta/model_v2/src/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Model {
pub create_type: CreateType,
pub timezone: Option<String>,
pub parallelism: StreamingParallelism,
pub max_parallelism: i32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ impl CatalogController {
HashMap<ActorId, Vec<actor_dispatcher::Model>>,
)>,
parallelism: StreamingParallelism,
max_parallelism: usize,
) -> MetaResult<PbTableFragments> {
let mut pb_fragments = HashMap::new();
let mut pb_actor_splits = HashMap::new();
Expand Down Expand Up @@ -347,6 +348,7 @@ impl CatalogController {
),
node_label: "".to_string(),
backfill_done: true,
max_parallelism: Some(max_parallelism as _),
};

Ok(table_fragments)
Expand Down Expand Up @@ -669,6 +671,7 @@ impl CatalogController {
job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
job_info.parallelism.clone(),
job_info.max_parallelism as _,
)
}

Expand Down Expand Up @@ -790,6 +793,7 @@ impl CatalogController {
job.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
job.parallelism.clone(),
job.max_parallelism as _,
)?,
);
}
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl CatalogController {
create_type: PbCreateType,
ctx: &StreamContext,
streaming_parallelism: StreamingParallelism,
max_parallelism: usize,
) -> MetaResult<ObjectId> {
let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
let job = streaming_job::ActiveModel {
Expand All @@ -91,6 +92,7 @@ impl CatalogController {
create_type: Set(create_type.into()),
timezone: Set(ctx.timezone.clone()),
parallelism: Set(streaming_parallelism),
max_parallelism: Set(max_parallelism as _),
};
job.insert(txn).await?;

Expand All @@ -102,6 +104,7 @@ impl CatalogController {
streaming_job: &mut StreamingJob,
ctx: &StreamContext,
parallelism: &Option<Parallelism>,
max_parallelism: usize,
) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand Down Expand Up @@ -169,6 +172,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
table.id = job_id as _;
Expand Down Expand Up @@ -204,6 +208,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
sink.id = job_id as _;
Expand All @@ -220,6 +225,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
table.id = job_id as _;
Expand Down Expand Up @@ -255,6 +261,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
// to be compatible with old implementation.
Expand Down Expand Up @@ -285,6 +292,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
src.id = job_id as _;
Expand Down Expand Up @@ -631,6 +639,7 @@ impl CatalogController {
ctx: &StreamContext,
version: &PbTableVersion,
specified_parallelism: &Option<NonZeroUsize>,
max_parallelism: usize,
) -> MetaResult<ObjectId> {
let id = streaming_job.id();
let inner = self.inner.write().await;
Expand Down Expand Up @@ -685,6 +694,7 @@ impl CatalogController {
PbCreateType::Foreground,
ctx,
parallelism,
max_parallelism,
)
.await?;

Expand Down
21 changes: 20 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::ops::AddAssign;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::WorkerSlotId;
use risingwave_common::hash::{VirtualNode, WorkerSlotId};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
Expand Down Expand Up @@ -115,6 +115,18 @@ pub struct TableFragments {

/// The parallelism assigned to this table fragments
pub assigned_parallelism: TableParallelism,

/// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
///
/// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
/// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
/// the streaming job.
///
/// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
/// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
/// checking the parallelism change with this value can be inaccurate in some cases. However,
/// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
pub max_parallelism: usize,
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -167,6 +179,7 @@ impl MetadataModel for TableFragments {
parallelism: Some(self.assigned_parallelism.into()),
node_label: "".to_string(),
backfill_done: true,
max_parallelism: Some(self.max_parallelism as _),
}
}

Expand All @@ -187,6 +200,9 @@ impl MetadataModel for TableFragments {
actor_splits: build_actor_split_impls(&prost.actor_splits),
ctx,
assigned_parallelism: prost.parallelism.unwrap_or(default_parallelism).into(),
max_parallelism: prost
.max_parallelism
.map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _),
}
}

Expand All @@ -204,6 +220,7 @@ impl TableFragments {
&BTreeMap::new(),
StreamContext::default(),
TableParallelism::Adaptive,
VirtualNode::COUNT_FOR_TEST,
)
}

Expand All @@ -215,6 +232,7 @@ impl TableFragments {
actor_locations: &BTreeMap<ActorId, WorkerSlotId>,
ctx: StreamContext,
table_parallelism: TableParallelism,
max_parallelism: usize,
) -> Self {
let actor_status = actor_locations
.iter()
Expand All @@ -237,6 +255,7 @@ impl TableFragments {
actor_splits: HashMap::default(),
ctx,
assigned_parallelism: table_parallelism,
max_parallelism,
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,7 @@ impl DdlController {
&building_locations.actor_locations,
stream_ctx.clone(),
table_parallelism,
max_parallelism.get(),
);

if let Some(mview_fragment) = table_fragments.mview_fragment() {
Expand Down Expand Up @@ -1727,6 +1728,7 @@ impl DdlController {
&stream_ctx,
table.get_version()?,
&fragment_graph.specified_parallelism(),
fragment_graph.max_parallelism(),
)
.await? as u32
}
Expand Down Expand Up @@ -2178,6 +2180,7 @@ impl DdlController {
&building_locations.actor_locations,
stream_ctx,
old_table_fragments.assigned_parallelism,
old_table_fragments.max_parallelism,
);

// Note: no need to set `vnode_count` as it's already set by the frontend.
Expand Down
9 changes: 8 additions & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ impl DdlController {

let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
mgr.catalog_controller
.create_job_catalog(&mut streaming_job, &ctx, &fragment_graph.parallelism)
.create_job_catalog(
&mut streaming_job,
&ctx,
&fragment_graph.parallelism,
fragment_graph.max_parallelism as _,
)
.await?;
let job_id = streaming_job.id();

Expand Down Expand Up @@ -296,6 +301,7 @@ impl DdlController {
&stream_ctx,
table.get_version()?,
&fragment_graph.specified_parallelism(),
fragment_graph.max_parallelism(),
)
.await? as u32;

Expand Down Expand Up @@ -440,6 +446,7 @@ impl DdlController {
&ctx,
table.get_version()?,
&fragment_graph.specified_parallelism(),
fragment_graph.max_parallelism(),
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,7 @@ mod tests {
&locations.actor_locations,
Default::default(),
TableParallelism::Adaptive,
VirtualNode::COUNT_FOR_TEST,
);
let ctx = CreateStreamingJobContext {
building_locations: locations,
Expand Down

0 comments on commit a5898ce

Please sign in to comment.