diff --git a/proto/catalog.proto b/proto/catalog.proto index f792eccc0cab6..2b6ca54553de7 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -424,8 +424,8 @@ message Table { // because the vnode count of each fragment (then internal tables) is determined // by the meta service. // - The table is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. - // Use `VnodeCountCompat::vnode_count` to access it. + // supported, in which case a default value of 256 (or 1 for singleton) should + // be used. Use `VnodeCountCompat::vnode_count` to access it. // // Please note that this field is not intended to describe the expected vnode count // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`. diff --git a/proto/meta.proto b/proto/meta.proto index e34e822dad8fb..af93c71af9e6a 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -99,7 +99,7 @@ message TableFragments { // Duplicated from the length of the vnode bitmap in any actor of the fragment. // // Can be unset if the fragment is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 8; } diff --git a/proto/plan_common.proto b/proto/plan_common.proto index a552c9f0a5fae..17e4ad07c68b2 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -100,7 +100,7 @@ message StorageTableDesc { // Total vnode count of the table. // // Can be unset if the table is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 12; } diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index a40946273a0a7..0ba54d8e0637b 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -45,16 +45,16 @@ impl Bitmap { } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + /// 1 and the only [`SINGLETON_VNODE`] set to true. pub fn singleton() -> &'static Self { Self::singleton_arc() } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + /// 1 and the only [`SINGLETON_VNODE`] set to true. pub fn singleton_arc() -> &'static Arc { static SINGLETON: LazyLock> = LazyLock::new(|| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT); + let mut builder = BitmapBuilder::zeroed(1); builder.set(SINGLETON_VNODE.to_index(), true); builder.finish().into() }); diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 0c86fbb12bcd4..75e5476e701f0 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -15,38 +15,85 @@ use super::vnode::VirtualNode; /// A trait for accessing the vnode count field with backward compatibility. +/// +/// # `maybe_`? +/// +/// The reason why there's a `maybe_` prefix on the protobuf field is that, a getter +/// method with the same name as the field will be generated for `prost` structs. +/// Directly naming it `vnode_count` will lead to the method `vnode_count()` returning +/// `0` when the field is unset, which can be misleading sometimes. +/// +/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count` +/// through this trait, ensuring that backward compatibility is handled properly. pub trait VnodeCountCompat { - /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, - /// typically for backward compatibility. + /// Returns the vnode count if it's set. Otherwise, returns [`VirtualNode::COUNT_FOR_COMPAT`] + /// for distributed tables/fragments, and `1` for singleton tables/fragments, for backward + /// compatibility. /// /// See the documentation on the field of the implementing type for more details. fn vnode_count(&self) -> usize; } -/// Implement the trait for given types by delegating to the `maybe_vnode_count` field. -/// -/// The reason why there's a `maybe_` prefix is that, a getter method with the same name -/// as the field will be generated for `prost` structs. Directly naming it `vnode_count` -/// will lead to the method `vnode_count()` returning `0` when the field is unset, which -/// can be misleading sometimes. -/// -/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count` -/// through this trait, ensuring that backward compatibility is handled properly. -macro_rules! impl_maybe_vnode_count_compat { - ($($ty:ty),* $(,)?) => { - $( - impl VnodeCountCompat for $ty { - fn vnode_count(&self) -> usize { - self.maybe_vnode_count - .map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _) - } - } - )* - }; +impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment { + fn vnode_count(&self) -> usize { + use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; + + if let Some(vnode_count) = self.maybe_vnode_count { + return vnode_count as _; + } + + // Compatibility: derive vnode count from distribution. + match self.distribution_type() { + FragmentDistributionType::Unspecified => unreachable!(), + FragmentDistributionType::Single => 1, + FragmentDistributionType::Hash => VirtualNode::COUNT_FOR_COMPAT, + } + } +} + +impl VnodeCountCompat for risingwave_pb::catalog::Table { + fn vnode_count(&self) -> usize { + if let Some(vnode_count) = self.maybe_vnode_count { + return vnode_count as _; + } + + // Compatibility: derive vnode count from distribution. + if self.distribution_key.is_empty() { + // Singleton table. + assert!( + self.dist_key_in_pk.is_empty(), + "empty dist key, while dist key in pk is set: {:?}", + self.dist_key_in_pk + ); + assert!( + self.vnode_col_index.is_none(), + "empty dist key, while vnode col index is set: {:?}", + self.vnode_col_index + ); + 1 + } else { + VirtualNode::COUNT_FOR_COMPAT + } + } } -impl_maybe_vnode_count_compat!( - risingwave_pb::plan_common::StorageTableDesc, - risingwave_pb::catalog::Table, - risingwave_pb::meta::table_fragments::Fragment, -); +impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { + fn vnode_count(&self) -> usize { + if let Some(vnode_count) = self.maybe_vnode_count { + return vnode_count as _; + } + + // Compatibility: derive vnode count from distribution. + if self.dist_key_in_pk_indices.is_empty() { + // Singleton table. + assert!( + self.vnode_col_idx_in_pk.is_none(), + "empty dist key in pk indices, while vnode col index is set: {:?}", + self.vnode_col_idx_in_pk + ); + 1 + } else { + VirtualNode::COUNT_FOR_COMPAT + } + } +} diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 1e7bca125fc50..1450160153fd9 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -139,11 +139,9 @@ impl VnodeMapping { } } - /// Create a vnode mapping with the single item. Should only be used for singletons. - /// - /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count. + /// Create a vnode mapping with the single item and length of 1. Only for singletons. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT) + Self::new_uniform(std::iter::once(item), 1) } /// The length (or count) of the vnode in this mapping. diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 55ae23171bdf2..f893878c229c1 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -25,7 +25,7 @@ use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::{bail, hash}; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::meta::table_fragments::fragment::{ @@ -152,11 +152,9 @@ impl Distribution { } /// Get the vnode count of the distribution. - /// - /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT, + Distribution::Singleton(_) => 1, // only `SINGLETON_VNODE` Distribution::Hash(mapping) => mapping.len(), } } diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 0aa5ebca9007f..65f7e55cdd4e6 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -103,9 +103,9 @@ impl ActorContext { fragment_id: stream_actor.fragment_id, mview_definition: stream_actor.mview_definition.clone(), vnode_count: (stream_actor.vnode_bitmap.as_ref()) - // An unset `vnode_bitmap` means the actor is a singleton. - // For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton. - .map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()), + // An unset `vnode_bitmap` means the actor is a singleton, + // where only `SINGLETON_VNODE` is set. + .map_or(1, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val,