diff --git a/proto/catalog.proto b/proto/catalog.proto index 8eeb758432446..0c67a92f23cdd 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -422,7 +422,7 @@ message Table { // Use `VnodeCountCompat::vnode_count` to access it. // // - Can be unset if the table is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the // corresponding job is still in `Creating` status, in which case calling `vnode_count` // will panic. diff --git a/proto/meta.proto b/proto/meta.proto index aa006a3400b1e..bbd1265c44260 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -92,7 +92,7 @@ message TableFragments { // Duplicated from the length of the vnode bitmap in any actor of the fragment. // // Can be unset if the fragment is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 8; } diff --git a/proto/plan_common.proto b/proto/plan_common.proto index f561ee427ea46..487ab54e2a666 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -100,7 +100,7 @@ message StorageTableDesc { // Total vnode count of the table. // // Can be unset if the table is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 12; } diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index a40946273a0a7..0ba54d8e0637b 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -45,16 +45,16 @@ impl Bitmap { } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + /// 1 and the only [`SINGLETON_VNODE`] set to true. pub fn singleton() -> &'static Self { Self::singleton_arc() } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + /// 1 and the only [`SINGLETON_VNODE`] set to true. pub fn singleton_arc() -> &'static Arc { static SINGLETON: LazyLock> = LazyLock::new(|| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT); + let mut builder = BitmapBuilder::zeroed(1); builder.set(SINGLETON_VNODE.to_index(), true); builder.finish().into() }); diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs deleted file mode 100644 index c659da2fc6059..0000000000000 --- a/src/common/src/hash/consistent_hash/compat.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::num::NonZeroUsize; - -use super::vnode::VirtualNode; - -/// The different cases of `maybe_vnode_count` field in the protobuf message. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] -pub enum VnodeCount { - /// The field is a placeholder and has to be filled first before using it. - #[default] - Placeholder, - /// The field is set to a specific value. - Set(NonZeroUsize), - /// The field is unset because it's persisted in an older version. - Compat, -} - -impl VnodeCount { - /// Creates a `VnodeCount` set to the given value. - pub fn set(v: impl TryInto + Copy + std::fmt::Debug) -> Self { - let v = (v.try_into().ok()) - .filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v)) - .unwrap_or_else(|| panic!("invalid vnode count {v:?}")); - - VnodeCount::Set(NonZeroUsize::new(v).unwrap()) - } - - /// Creates a `VnodeCount` set to the value for testing. - /// - /// Equivalent to `VnodeCount::set(VirtualNode::COUNT_FOR_TEST)`. - pub fn for_test() -> Self { - Self::set(VirtualNode::COUNT_FOR_TEST) - } - - /// Converts to protobuf representation for `maybe_vnode_count`. - pub fn to_protobuf(self) -> Option { - match self { - VnodeCount::Placeholder => Some(0), - VnodeCount::Set(v) => Some(v.get() as _), - VnodeCount::Compat => None, - } - } - - /// Converts from protobuf representation of `maybe_vnode_count`. - pub fn from_protobuf(v: Option) -> Self { - match v { - Some(0) => VnodeCount::Placeholder, - Some(v) => VnodeCount::set(v as usize), - None => VnodeCount::Compat, - } - } - - /// Returns the value of the vnode count, or `None` if it's a placeholder. - pub fn value_opt(self) -> Option { - match self { - VnodeCount::Placeholder => None, - VnodeCount::Set(v) => Some(v.get()), - VnodeCount::Compat => Some(VirtualNode::COUNT_FOR_COMPAT), - } - } - - /// Returns the value of the vnode count. Panics if it's a placeholder. - pub fn value(self) -> usize { - self.value_opt() - .expect("vnode count is a placeholder that must be filled by the meta service first") - } -} - -/// A trait for accessing the vnode count field with backward compatibility. -pub trait VnodeCountCompat { - /// Get the `maybe_vnode_count` field. - fn vnode_count_inner(&self) -> VnodeCount; - - /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, - /// typically for backward compatibility. Panics if the field is a placeholder. - /// - /// Equivalent to `self.vnode_count_inner().value()`. - /// - /// See the documentation on the field of the implementing type for more details. - fn vnode_count(&self) -> usize { - self.vnode_count_inner().value() - } -} - -/// Implement the trait for given types by delegating to the `maybe_vnode_count` field. -/// -/// The reason why there's a `maybe_` prefix is that, a getter method with the same name -/// as the field will be generated for `prost` structs. Directly naming it `vnode_count` -/// will lead to the method `vnode_count()` returning `0` when the field is unset, which -/// can be misleading sometimes. -/// -/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count` -/// through this trait, ensuring that backward compatibility is handled properly. -macro_rules! impl_maybe_vnode_count_compat { - ($($ty:ty),* $(,)?) => { - $( - impl VnodeCountCompat for $ty { - fn vnode_count_inner(&self) -> VnodeCount { - VnodeCount::from_protobuf(self.maybe_vnode_count) - } - } - )* - }; -} - -impl_maybe_vnode_count_compat!( - risingwave_pb::plan_common::StorageTableDesc, - risingwave_pb::catalog::Table, - risingwave_pb::meta::table_fragments::Fragment, -); diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 1e7bca125fc50..2deed907103f1 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -139,11 +139,12 @@ impl VnodeMapping { } } - /// Create a vnode mapping with the single item. Should only be used for singletons. + /// Create a vnode mapping with the single item and length of 1. /// - /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count. + /// Should only be used for singletons. If you want a different vnode count, call + /// [`VnodeMapping::new_uniform`] with `std::iter::once(item)` and desired length. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT) + Self::new_uniform(std::iter::once(item), 1) } /// The length (or count) of the vnode in this mapping. diff --git a/src/common/src/hash/consistent_hash/mod.rs b/src/common/src/hash/consistent_hash/mod.rs index 98a970af5947d..5f42369248bab 100644 --- a/src/common/src/hash/consistent_hash/mod.rs +++ b/src/common/src/hash/consistent_hash/mod.rs @@ -13,6 +13,6 @@ // limitations under the License. pub mod bitmap; -pub mod compat; pub mod mapping; pub mod vnode; +pub mod vnode_count; diff --git a/src/common/src/hash/consistent_hash/vnode_count.rs b/src/common/src/hash/consistent_hash/vnode_count.rs new file mode 100644 index 0000000000000..0040a79f780fa --- /dev/null +++ b/src/common/src/hash/consistent_hash/vnode_count.rs @@ -0,0 +1,164 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::num::NonZeroUsize; + +use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; + +use super::vnode::VirtualNode; + +/// The different cases of `maybe_vnode_count` field in the protobuf message. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub enum VnodeCount { + /// The field is a placeholder and has to be filled first before using it. + #[default] + Placeholder, + /// The field is set to a specific value. + Set(NonZeroUsize), + /// The field is unset because the table/fragment is persisted as hash-distributed + /// in an older version. + CompatHash, + /// The field is unset because the table/fragment is persisted as singleton + /// in an older version. + CompatSingleton, +} + +impl VnodeCount { + /// Creates a `VnodeCount` set to the given value. + pub fn set(v: impl TryInto + Copy + std::fmt::Debug) -> Self { + let v = (v.try_into().ok()) + .filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v)) + .unwrap_or_else(|| panic!("invalid vnode count {v:?}")); + + VnodeCount::Set(NonZeroUsize::new(v).unwrap()) + } + + /// Creates a `VnodeCount` set to the value for testing. + /// + /// Equivalent to `VnodeCount::set(VirtualNode::COUNT_FOR_TEST)`. + pub fn for_test() -> Self { + Self::set(VirtualNode::COUNT_FOR_TEST) + } + + /// Converts from protobuf representation of `maybe_vnode_count`. If the value is not set, + /// call `compat_is_singleton` to determine whether it should be treated as a singleton + /// when it comes to backward compatibility. + fn from_protobuf(v: Option, compat_is_singleton: impl FnOnce() -> bool) -> Self { + match v { + Some(0) => VnodeCount::Placeholder, + Some(v) => VnodeCount::set(v as usize), + None => { + if compat_is_singleton() { + VnodeCount::CompatSingleton + } else { + VnodeCount::CompatHash + } + } + } + } + + /// Converts to protobuf representation for `maybe_vnode_count`. + pub fn to_protobuf(self) -> Option { + // Effectively fills the compatibility cases with values. + self.value_opt() + .map_or(Some(0) /* placeholder */, |v| Some(v as _)) + } + + /// Returns the value of the vnode count, or `None` if it's a placeholder. + pub fn value_opt(self) -> Option { + match self { + VnodeCount::Placeholder => None, + VnodeCount::Set(v) => Some(v.get()), + VnodeCount::CompatHash => Some(VirtualNode::COUNT_FOR_COMPAT), + VnodeCount::CompatSingleton => Some(1), + } + } + + /// Returns the value of the vnode count. Panics if it's a placeholder. + pub fn value(self) -> usize { + self.value_opt() + .expect("vnode count is a placeholder that must be filled by the meta service first") + } +} + +/// A trait for checking whether a table/fragment is a singleton. +pub trait IsSingleton { + /// Returns `true` if the table/fragment is a singleton. + /// + /// By singleton, we mean that all data read from or written to the storage belongs to + /// the only `SINGLETON_VNODE`. This must be consistent with the behavior of + /// [`TableDistribution`](crate::hash::table_distribution::TableDistribution::new). + /// As a result, the `vnode_count` of such table/fragment can be `1`. + fn is_singleton(&self) -> bool; +} + +/// A trait for accessing the vnode count field with backward compatibility. +/// +/// # `maybe_`? +/// +/// The reason why there's a `maybe_` prefix on the protobuf field is that, a getter +/// method with the same name as the field will be generated for `prost` structs. +/// Directly naming it `vnode_count` will lead to the method `vnode_count()` returning +/// `0` when the field is unset, which can be misleading sometimes. +/// +/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count` +/// through this trait, ensuring that backward compatibility is handled properly. +pub trait VnodeCountCompat { + /// Get the `maybe_vnode_count` field. + fn vnode_count_inner(&self) -> VnodeCount; + + /// Returns the vnode count if it's set. Otherwise, returns [`VirtualNode::COUNT_FOR_COMPAT`] + /// for distributed tables/fragments, and `1` for singleton tables/fragments, for backward + /// compatibility. Panics if the field is a placeholder. + /// + /// See the documentation on the field of the implementing type for more details. + fn vnode_count(&self) -> usize { + self.vnode_count_inner().value() + } +} + +impl IsSingleton for risingwave_pb::catalog::Table { + fn is_singleton(&self) -> bool { + self.distribution_key.is_empty() + && self.dist_key_in_pk.is_empty() + && self.vnode_col_index.is_none() + } +} +impl VnodeCountCompat for risingwave_pb::catalog::Table { + fn vnode_count_inner(&self) -> VnodeCount { + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) + } +} + +impl IsSingleton for risingwave_pb::plan_common::StorageTableDesc { + fn is_singleton(&self) -> bool { + self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() + } +} +impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { + fn vnode_count_inner(&self) -> VnodeCount { + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) + } +} + +impl IsSingleton for risingwave_pb::meta::table_fragments::Fragment { + fn is_singleton(&self) -> bool { + matches!(self.distribution_type(), FragmentDistributionType::Single) + } +} +impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment { + fn vnode_count_inner(&self) -> VnodeCount { + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) + } +} diff --git a/src/common/src/hash/mod.rs b/src/common/src/hash/mod.rs index e54376bed2d50..8ed8120671179 100644 --- a/src/common/src/hash/mod.rs +++ b/src/common/src/hash/mod.rs @@ -19,9 +19,9 @@ mod key_v2; pub mod table_distribution; pub use consistent_hash::bitmap::*; -pub use consistent_hash::compat::*; pub use consistent_hash::mapping::*; pub use consistent_hash::vnode::*; +pub use consistent_hash::vnode_count::*; pub use dispatcher::{calc_hash_key_kind, HashKeyDispatcher}; pub use key::{ Crc32HashCode, HashCode, HashKeyDe, HashKeySer, HeapNullBitmap, NullBitmap, diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 822db591c1577..a91f48f6589af 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use crate::array::{Array, DataChunk, PrimitiveArray}; use crate::bitmap::Bitmap; -use crate::hash::VirtualNode; +use crate::hash::{IsSingleton, VirtualNode}; use crate::row::Row; use crate::util::iter_util::ZipEqFast; @@ -64,7 +64,10 @@ impl TableDistribution { .map(|&k| k as usize) .collect_vec(); let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize); - Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk) + + let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); + assert_eq!(this.is_singleton(), table_desc.is_singleton()); + this } pub fn new( diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 40b350ade287a..2433a659bad0a 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -138,6 +138,21 @@ impl TableCatalogBuilder { Some(w) => w, None => FixedBitSet::with_capacity(self.columns.len()), }; + + // If `dist_key_in_pk` is set, check if it matches with `distribution_key`. + // Note that we cannot derive in the opposite direction, because there can be a column + // appearing multiple times in the PK. + if let Some(dist_key_in_pk) = &self.dist_key_in_pk { + let derived_dist_key = dist_key_in_pk + .iter() + .map(|idx| self.pk[*idx].column_index) + .collect_vec(); + assert_eq!( + derived_dist_key, distribution_key, + "dist_key mismatch with dist_key_in_pk" + ); + } + TableCatalog { id: TableId::placeholder(), associated_source_id: None, diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 9bd1851383af6..90984750bc460 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1228,8 +1228,21 @@ fn derive_partitions( table_desc: &TableDesc, vnode_mapping: &WorkerSlotMapping, ) -> SchedulerResult> { + let vnode_mapping = if table_desc.vnode_count != vnode_mapping.len() { + // The vnode count mismatch occurs only in special cases where a hash-distributed fragment + // contains singleton internal tables. e.g., the state table of `Source` executors. + // In this case, we reduce the vnode mapping to a single vnode as only `SINGLETON_VNODE` is used. + assert!( + table_desc.vnode_count == 1, + "fragment vnode count {} does not match table vnode count {}", + vnode_mapping.len(), + table_desc.vnode_count, + ); + &WorkerSlotMapping::new_single(vnode_mapping.iter().next().unwrap()) + } else { + vnode_mapping + }; let vnode_count = vnode_mapping.len(); - assert_eq!(vnode_count, table_desc.vnode_count); let mut partitions: HashMap)> = HashMap::new(); diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index 4ae39aea29198..0c8c244c0624e 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -23,6 +23,7 @@ mod m20240806_143329_add_rate_limit_to_source_catalog; mod m20240820_081248_add_time_travel_per_table_epoch; mod m20240911_083152_variable_vnode_count; mod m20241016_065621_hummock_gc_history; +mod m20241025_062548_singleton_vnode_count; pub struct Migrator; @@ -83,6 +84,7 @@ impl MigratorTrait for Migrator { Box::new(m20240820_081248_add_time_travel_per_table_epoch::Migration), Box::new(m20240911_083152_variable_vnode_count::Migration), Box::new(m20241016_065621_hummock_gc_history::Migration), + Box::new(m20241025_062548_singleton_vnode_count::Migration), ] } } diff --git a/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs b/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs new file mode 100644 index 0000000000000..eb276099e24bf --- /dev/null +++ b/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs @@ -0,0 +1,57 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Fill vnode count with 1 for singleton tables. + manager + .exec_stmt( + UpdateStatement::new() + .table(Table::Table) + .values([(Table::VnodeCount, Expr::value(1))]) + .and_where(Expr::col(Table::DistributionKey).eq(Expr::value("[]"))) + .and_where(Expr::col(Table::DistKeyInPk).eq(Expr::value("[]"))) + .and_where(Expr::col(Table::VnodeColIndex).is_null()) + .to_owned(), + ) + .await?; + + // Fill vnode count with 1 for singleton fragments. + manager + .exec_stmt( + UpdateStatement::new() + .table(Fragment::Table) + .values([(Fragment::VnodeCount, Expr::value(1))]) + .and_where(Expr::col(Fragment::DistributionType).eq(Expr::value("SINGLE"))) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> { + Err(DbErr::Migration( + "cannot rollback singleton vnode count migration".to_owned(), + ))? + } +} + +#[derive(DeriveIden)] +enum Fragment { + Table, + VnodeCount, + DistributionType, +} + +#[derive(DeriveIden)] +enum Table { + Table, + VnodeCount, + DistributionKey, + DistKeyInPk, + VnodeColIndex, +} diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index bfcb7830ec6e9..1da4d95968f40 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::num::NonZeroUsize; use itertools::Itertools; +use risingwave_common::hash::VnodeCountCompat; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::{bail, current_cluster_version}; @@ -41,8 +42,7 @@ use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; use risingwave_pb::meta::{ - PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, PbTableFragments, Relation, - RelationGroup, + PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, Relation, RelationGroup, }; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -69,7 +69,7 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, StreamingJob}; -use crate::model::{StreamContext, TableParallelism}; +use crate::model::{StreamContext, TableFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -395,14 +395,18 @@ impl CatalogController { Ok(table_id_map) } + // TODO: In this function, we also update the `Table` model in the meta store. + // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider + // making them the source of truth and performing a full replacement for those in the meta store? pub async fn prepare_streaming_job( &self, - table_fragment: PbTableFragments, + table_fragments: &TableFragments, streaming_job: &StreamingJob, for_replace: bool, ) -> MetaResult<()> { let fragment_actors = - Self::extract_fragment_and_actors_from_table_fragments(table_fragment)?; + Self::extract_fragment_and_actors_from_table_fragments(table_fragments.to_protobuf())?; + let all_tables = table_fragments.all_tables(); let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -414,7 +418,6 @@ impl CatalogController { for fragment in fragments { let fragment_id = fragment.fragment_id; let state_table_ids = fragment.state_table_ids.inner_ref().clone(); - let vnode_count = fragment.vnode_count; let fragment = fragment.into_active_model(); Fragment::insert(fragment).exec(&txn).await?; @@ -423,10 +426,19 @@ impl CatalogController { // After table fragments are created, update them for all internal tables. if !for_replace { for state_table_id in state_table_ids { + // Table's vnode count is not always the fragment's vnode count, so we have to + // look up the table from `TableFragments`. + // See `ActorGraphBuilder::new`. + let table = all_tables + .get(&(state_table_id as u32)) + .unwrap_or_else(|| panic!("table {} not found", state_table_id)); + assert_eq!(table.fragment_id, fragment_id as u32); + let vnode_count = table.vnode_count(); + table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), - vnode_count: Set(vnode_count), + vnode_count: Set(vnode_count as _), ..Default::default() } .update(&txn) @@ -1045,25 +1057,24 @@ impl CatalogController { table.incoming_sinks = Set(incoming_sinks.into()); let table = table.update(txn).await?; - // Fields including `fragment_id` and `vnode_count` were placeholder values before. + // Fields including `fragment_id` were placeholder values before. // After table fragments are created, update them for all internal tables. - let fragment_info: Vec<(FragmentId, I32Array, i32)> = Fragment::find() + let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, fragment::Column::StateTableIds, - fragment::Column::VnodeCount, ]) .filter(fragment::Column::JobId.eq(dummy_id)) .into_tuple() .all(txn) .await?; - for (fragment_id, state_table_ids, vnode_count) in fragment_info { + for (fragment_id, state_table_ids) in fragment_info { for state_table_id in state_table_ids.into_inner() { table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), - vnode_count: Set(vnode_count), + // No need to update `vnode_count` because it must remain the same. ..Default::default() } .update(txn) diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index e956d238a84cc..492e67bdce557 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -576,10 +576,21 @@ impl TableFragments { /// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`], /// the table catalogs returned here are complete, with all fields filled. pub fn internal_tables(&self) -> BTreeMap { + self.collect_tables_inner(true) + } + + /// `internal_tables()` with additional table in `Materialize` node. + pub fn all_tables(&self) -> BTreeMap { + self.collect_tables_inner(false) + } + + fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { - stream_graph_visitor::visit_stream_node_internal_tables( + stream_graph_visitor::visit_stream_node_tables_inner( &mut fragment.actors[0].nodes.clone().unwrap(), + internal_tables_only, + true, |table, _| { let table_id = table.id; tables diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index cec9efa323a46..d2c46b284bddf 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1114,7 +1114,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false) + .prepare_streaming_job(&table_fragments, streaming_job, false) .await?; // create streaming jobs. @@ -1257,7 +1257,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; self.stream_manager @@ -1465,7 +1465,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; self.stream_manager diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 3446a2661d962..d7e1b0b1b6df7 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -20,7 +20,7 @@ use assert_matches::assert_matches; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorId, ActorMapping, WorkerSlotId}; +use risingwave_common::hash::{ActorId, ActorMapping, IsSingleton, VnodeCount, WorkerSlotId}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor::visit_tables; use risingwave_meta_model::WorkerId; @@ -683,9 +683,22 @@ impl ActorGraphBuilder { // Fill the vnode count for each internal table, based on schedule result. let mut fragment_graph = fragment_graph; for (id, fragment) in fragment_graph.building_fragments_mut() { - let vnode_count = distributions[id].vnode_count(); + let fragment_vnode_count = distributions[id].vnode_count(); visit_tables(fragment, |table, _| { - table.maybe_vnode_count = Some(vnode_count as _); + // There are special cases where a hash-distributed fragment contains singleton + // internal tables, e.g., the state table of `Source` executors. + let vnode_count = if table.is_singleton() { + if fragment_vnode_count > 1 { + tracing::info!( + table.name, + "found singleton table in hash-distributed fragment" + ); + } + 1 + } else { + fragment_vnode_count + }; + table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(); }) } diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 5d465b19d195d..97b5c3032117e 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -25,7 +25,7 @@ use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::{bail, hash}; use risingwave_meta_model::WorkerId; use risingwave_pb::common::{ActorInfo, WorkerNode}; @@ -152,11 +152,9 @@ impl Distribution { } /// Get the vnode count of the distribution. - /// - /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT, + Distribution::Singleton(_) => 1, // only `SINGLETON_VNODE` Distribution::Hash(mapping) => mapping.len(), } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 21a642f1be1c7..7bf4058c647cf 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -350,7 +350,7 @@ impl GlobalStreamManager { if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; let dummy_table_id = table_fragments.table_id(); diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 3ada0afd71625..55957af9e6e5b 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -29,7 +29,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption, }; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::{IsSingleton, VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, once, CompactedRow, Once, OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl}; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -363,6 +363,7 @@ where let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); + assert_eq!(distribution.is_singleton(), table_catalog.is_singleton()); let pk_data_types = pk_indices .iter() diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 0aa5ebca9007f..65f7e55cdd4e6 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -103,9 +103,9 @@ impl ActorContext { fragment_id: stream_actor.fragment_id, mview_definition: stream_actor.mview_definition.clone(), vnode_count: (stream_actor.vnode_bitmap.as_ref()) - // An unset `vnode_bitmap` means the actor is a singleton. - // For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton. - .map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()), + // An unset `vnode_bitmap` means the actor is a singleton, + // where only `SINGLETON_VNODE` is set. + .map_or(1, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val,