Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): persist job-level max parallelism & check when ALTER .. SET PARALLELISM #18740

Merged
merged 6 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions e2e_test/ddl/max_parallelism.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
statement ok
create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;

#### BEGIN


statement ok
set streaming_max_parallelism to 4;

# When the parallelism is specified to a value greater than the max parallelism, return an error.
statement ok
set streaming_parallelism to 6;

statement error specified parallelism 6 should not exceed max parallelism 4
create table t;

# When the parallelism is specified to an valid value, ok.
statement ok
set streaming_parallelism to 4;

statement ok
create table t;

query T
select parallelism from table_parallelism where name = 't';
----
FIXED(4)

statement ok
drop table t;

# When no parallelism is specified, ok, and the parallelism will be adaptive.

statement ok
set streaming_parallelism to default;

statement ok
create table t;

query T
select parallelism from table_parallelism where name = 't';
----
ADAPTIVE

# Alter parallelism to a valid value, ok.
statement ok
alter table t set parallelism to 4;

query T
select parallelism from table_parallelism where name = 't';
----
FIXED(4)

# Alter parallelism to an invalid value, return an error.
statement error specified parallelism 8 should not exceed max parallelism 4
alter table t set parallelism to 8;

statement ok
drop table t;

#### END

statement ok
set streaming_max_parallelism to default;

statement ok
set streaming_parallelism to default;

statement ok
drop view table_parallelism;
2 changes: 1 addition & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ message Table {
// Use `VnodeCountCompat::vnode_count` to access it.
//
// Please note that this field is not intended to describe the expected vnode count
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.expected_vnode_count`.
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
optional uint32 maybe_vnode_count = 40;

// Per-table catalog version, used by schema change. `None` for internal
Expand Down
15 changes: 15 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,22 @@ message TableFragments {
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamContext ctx = 6;

TableParallelism parallelism = 7;
// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
//
// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
// the streaming job.
//
// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
// checking the parallelism change with this value can be inaccurate in some cases. However,
// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
//
// Can be unset if the fragment is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
optional uint32 max_parallelism = 10;

// Actors of a materialize view, sink, or table can only be scheduled on nodes with matching node_label.
string node_label = 8;
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1041,13 +1041,14 @@
// If none, default parallelism will be applied.
Parallelism parallelism = 6;

// Expected vnode count for the graph.
// Specified max parallelism, i.e., expected vnode count for the graph.
//
// The scheduler on the meta service will use this as a hint to decide the vnode count
// for each fragment.
//
// Note that the actual vnode count may be different from this value.
// For example, a no-shuffle exchange between current fragment graph and an existing
// upstream fragment graph requires two fragments to be in the same distribution,
// thus the same vnode count.
uint32 expected_vnode_count = 7;
uint32 max_parallelism = 7;

Check failure on line 1053 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "7" with name "max_parallelism" on message "StreamFragmentGraph" changed option "json_name" from "expectedVnodeCount" to "maxParallelism".

Check failure on line 1053 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "7" on message "StreamFragmentGraph" changed name from "expected_vnode_count" to "max_parallelism".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'd like to use vnode_count for all internal occurrences, and only use "max_parallelism" for the user-facing part. Because we (RW developers) are familiar with vnode, so vnode_count sounds like the most straight-forward naming.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My major concern is that, vnode_count of a streaming job may not be a physical concept at all. But it's true that prefixing with expected makes it doesn't sounds that confusing. Will change.

Copy link
Member Author

@BugenZhao BugenZhao Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon further consideration, I still think max_parallelism is more understandable because it aligns better with existing job attributes like parallelism (or assigned_parallelism). When the scope goes to fragment, it'll still be named as vnode_count.

}
1 change: 1 addition & 0 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
create_type: Set(CreateType::Foreground),
timezone: Set(table_fragment.timezone()),
parallelism: Set(streaming_parallelism),
max_parallelism: Set(table_fragment.max_parallelism as _),
})
.exec(&meta_store_sql.conn)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl StreamFragmentGraph {
dependent_table_ids: vec![],
table_ids_cnt: 0,
parallelism: None,
expected_vnode_count: 0,
max_parallelism: 0,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub fn build_graph(plan_node: PlanRef) -> SchedulerResult<StreamFragmentGraphPro
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
});
fragment_graph.expected_vnode_count = config.streaming_max_parallelism() as _;
fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
}

// Set timezone.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use sea_orm_migration::prelude::{Table as MigrationTable, *};

const VNODE_COUNT: i32 = 256;
macro_rules! col {
($name:expr) => {
ColumnDef::new($name).integer().not_null().default(256) // compat vnode count
};
}

#[derive(DeriveMigrationName)]
pub struct Migration;
Expand All @@ -12,12 +16,7 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(Table::Table)
.add_column(
ColumnDef::new(Table::VnodeCount)
.integer()
.not_null()
.default(VNODE_COUNT),
)
.add_column(col!(Table::VnodeCount))
.to_owned(),
)
.await?;
Expand All @@ -26,12 +25,16 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(Fragment::Table)
.add_column(
ColumnDef::new(Fragment::VnodeCount)
.integer()
.not_null()
.default(VNODE_COUNT),
)
.add_column(col!(Fragment::VnodeCount))
.to_owned(),
)
.await?;

manager
.alter_table(
MigrationTable::alter()
.table(StreamingJob::Table)
.add_column(col!(StreamingJob::MaxParallelism))
.to_owned(),
)
.await
Expand All @@ -54,6 +57,15 @@ impl MigrationTrait for Migration {
.drop_column(Fragment::VnodeCount)
.to_owned(),
)
.await?;

manager
.alter_table(
MigrationTable::alter()
.table(StreamingJob::Table)
.drop_column(StreamingJob::MaxParallelism)
.to_owned(),
)
.await
}
}
Expand All @@ -69,3 +81,9 @@ enum Table {
Table,
VnodeCount,
}

#[derive(DeriveIden)]
enum StreamingJob {
Table,
MaxParallelism,
}
1 change: 1 addition & 0 deletions src/meta/model_v2/src/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Model {
pub create_type: CreateType,
pub timezone: Option<String>,
pub parallelism: StreamingParallelism,
pub max_parallelism: i32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
13 changes: 13 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ impl CatalogController {
HashMap<ActorId, Vec<actor_dispatcher::Model>>,
)>,
parallelism: StreamingParallelism,
max_parallelism: usize,
) -> MetaResult<PbTableFragments> {
let mut pb_fragments = HashMap::new();
let mut pb_actor_splits = HashMap::new();
Expand Down Expand Up @@ -347,6 +348,7 @@ impl CatalogController {
),
node_label: "".to_string(),
backfill_done: true,
max_parallelism: Some(max_parallelism as _),
};

Ok(table_fragments)
Expand Down Expand Up @@ -669,6 +671,7 @@ impl CatalogController {
job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
job_info.parallelism.clone(),
job_info.max_parallelism as _,
)
}

Expand All @@ -689,6 +692,15 @@ impl CatalogController {
Ok(job_states)
}

pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
let inner = self.inner.read().await;
let job = StreamingJob::find_by_id(job_id)
.one(&inner.db)
.await?
.ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
Ok(job.max_parallelism as usize)
}

/// Get all actor ids in the target streaming jobs.
pub async fn get_job_actor_mapping(
&self,
Expand Down Expand Up @@ -790,6 +802,7 @@ impl CatalogController {
job.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
job.parallelism.clone(),
job.max_parallelism as _,
)?,
);
}
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl CatalogController {
create_type: PbCreateType,
ctx: &StreamContext,
streaming_parallelism: StreamingParallelism,
max_parallelism: usize,
) -> MetaResult<ObjectId> {
let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
let job = streaming_job::ActiveModel {
Expand All @@ -91,6 +92,7 @@ impl CatalogController {
create_type: Set(create_type.into()),
timezone: Set(ctx.timezone.clone()),
parallelism: Set(streaming_parallelism),
max_parallelism: Set(max_parallelism as _),
};
job.insert(txn).await?;

Expand All @@ -102,6 +104,7 @@ impl CatalogController {
streaming_job: &mut StreamingJob,
ctx: &StreamContext,
parallelism: &Option<Parallelism>,
max_parallelism: usize,
) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand Down Expand Up @@ -169,6 +172,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
table.id = job_id as _;
Expand Down Expand Up @@ -204,6 +208,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
sink.id = job_id as _;
Expand All @@ -220,6 +225,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
table.id = job_id as _;
Expand Down Expand Up @@ -255,6 +261,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
// to be compatible with old implementation.
Expand Down Expand Up @@ -285,6 +292,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
src.id = job_id as _;
Expand Down Expand Up @@ -631,6 +639,7 @@ impl CatalogController {
ctx: &StreamContext,
version: &PbTableVersion,
specified_parallelism: &Option<NonZeroUsize>,
max_parallelism: usize,
) -> MetaResult<ObjectId> {
let id = streaming_job.id();
let inner = self.inner.write().await;
Expand Down Expand Up @@ -685,6 +694,7 @@ impl CatalogController {
PbCreateType::Foreground,
ctx,
parallelism,
max_parallelism,
)
.await?;

Expand Down
20 changes: 19 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::pin::pin;
use std::time::Duration;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use futures::future::{select, Either};
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_meta_model_v2::{ObjectId, SourceId};
Expand Down Expand Up @@ -892,6 +892,24 @@ impl MetadataManager {
}
}

pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
match self {
MetadataManager::V1(mgr) => {
let fragments = mgr.fragment_manager.get_fragment_read_guard().await;
Ok(fragments
.table_fragments()
.get(&table_id)
.map(|tf| tf.max_parallelism)
.with_context(|| format!("job {table_id} not found"))?)
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.get_max_parallelism_by_id(table_id.table_id as _)
.await
}
}
}

pub fn cluster_id(&self) -> &ClusterId {
match self {
MetadataManager::V1(mgr) => mgr.cluster_manager.cluster_id(),
Expand Down
Loading
Loading