From 8c20db2cbc5f27a0db7e26feab2d6e17c4651a6a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Sun, 29 Sep 2024 14:18:31 +0800 Subject: [PATCH 1/9] refactor: use 1 for vnode count of singletons for backward compatibility --- proto/catalog.proto | 2 +- proto/meta.proto | 2 +- proto/plan_common.proto | 2 +- src/common/src/hash/consistent_hash/bitmap.rs | 6 +- src/common/src/hash/consistent_hash/compat.rs | 58 ++++++++++++++++--- .../src/hash/consistent_hash/mapping.rs | 7 ++- src/frontend/src/optimizer/plan_node/utils.rs | 15 +++++ .../m20240911_083152_variable_vnode_count.rs | 28 +++++++++ src/meta/src/stream/stream_graph/schedule.rs | 6 +- src/stream/src/executor/actor.rs | 6 +- 10 files changed, 107 insertions(+), 25 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 8eeb758432446..0c67a92f23cdd 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -422,7 +422,7 @@ message Table { // Use `VnodeCountCompat::vnode_count` to access it. // // - Can be unset if the table is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the // corresponding job is still in `Creating` status, in which case calling `vnode_count` // will panic. diff --git a/proto/meta.proto b/proto/meta.proto index aa006a3400b1e..bbd1265c44260 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -92,7 +92,7 @@ message TableFragments { // Duplicated from the length of the vnode bitmap in any actor of the fragment. // // Can be unset if the fragment is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 8; } diff --git a/proto/plan_common.proto b/proto/plan_common.proto index f561ee427ea46..487ab54e2a666 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -100,7 +100,7 @@ message StorageTableDesc { // Total vnode count of the table. // // Can be unset if the table is created in older versions where variable vnode count is not - // supported, in which case a default value of 256 should be used. + // supported, in which case a default value of 256 (or 1 for singleton) should be used. // Use `VnodeCountCompat::vnode_count` to access it. optional uint32 maybe_vnode_count = 12; } diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index a40946273a0a7..0ba54d8e0637b 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -45,16 +45,16 @@ impl Bitmap { } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + /// 1 and the only [`SINGLETON_VNODE`] set to true. pub fn singleton() -> &'static Self { Self::singleton_arc() } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. + /// 1 and the only [`SINGLETON_VNODE`] set to true. pub fn singleton_arc() -> &'static Arc { static SINGLETON: LazyLock> = LazyLock::new(|| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT); + let mut builder = BitmapBuilder::zeroed(1); builder.set(SINGLETON_VNODE.to_index(), true); builder.finish().into() }); diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index c659da2fc6059..ce2c75c47e923 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -80,14 +80,23 @@ impl VnodeCount { } /// A trait for accessing the vnode count field with backward compatibility. +/// +/// # `maybe_`? +/// +/// The reason why there's a `maybe_` prefix on the protobuf field is that, a getter +/// method with the same name as the field will be generated for `prost` structs. +/// Directly naming it `vnode_count` will lead to the method `vnode_count()` returning +/// `0` when the field is unset, which can be misleading sometimes. +/// +/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count` +/// through this trait, ensuring that backward compatibility is handled properly. pub trait VnodeCountCompat { /// Get the `maybe_vnode_count` field. fn vnode_count_inner(&self) -> VnodeCount; - /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, - /// typically for backward compatibility. Panics if the field is a placeholder. - /// - /// Equivalent to `self.vnode_count_inner().value()`. + /// Returns the vnode count if it's set. Otherwise, returns [`VirtualNode::COUNT_FOR_COMPAT`] + /// for distributed tables/fragments, and `1` for singleton tables/fragments, for backward + /// compatibility. Panics if the field is a placeholder. /// /// See the documentation on the field of the implementing type for more details. fn vnode_count(&self) -> usize { @@ -116,8 +125,39 @@ macro_rules! impl_maybe_vnode_count_compat { }; } -impl_maybe_vnode_count_compat!( - risingwave_pb::plan_common::StorageTableDesc, - risingwave_pb::catalog::Table, - risingwave_pb::meta::table_fragments::Fragment, -); +// TODO!!!!!!!!!!!!!!!!! + +// impl VnodeCountCompat for risingwave_pb::catalog::Table { +// fn vnode_count(&self) -> usize { +// if let Some(vnode_count) = self.maybe_vnode_count { +// return vnode_count as _; +// } + +// // Compatibility: derive vnode count from distribution. +// if self.distribution_key.is_empty() +// && self.dist_key_in_pk.is_empty() +// && self.vnode_col_index.is_none() +// { +// // Singleton table. +// 1 +// } else { +// VirtualNode::COUNT_FOR_COMPAT +// } +// } +// } + +// impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { +// fn vnode_count(&self) -> usize { +// if let Some(vnode_count) = self.maybe_vnode_count { +// return vnode_count as _; +// } + +// // Compatibility: derive vnode count from distribution. +// if self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() { +// // Singleton table. +// 1 +// } else { +// VirtualNode::COUNT_FOR_COMPAT +// } +// } +// } diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 1e7bca125fc50..2deed907103f1 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -139,11 +139,12 @@ impl VnodeMapping { } } - /// Create a vnode mapping with the single item. Should only be used for singletons. + /// Create a vnode mapping with the single item and length of 1. /// - /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count. + /// Should only be used for singletons. If you want a different vnode count, call + /// [`VnodeMapping::new_uniform`] with `std::iter::once(item)` and desired length. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT) + Self::new_uniform(std::iter::once(item), 1) } /// The length (or count) of the vnode in this mapping. diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 40b350ade287a..2433a659bad0a 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -138,6 +138,21 @@ impl TableCatalogBuilder { Some(w) => w, None => FixedBitSet::with_capacity(self.columns.len()), }; + + // If `dist_key_in_pk` is set, check if it matches with `distribution_key`. + // Note that we cannot derive in the opposite direction, because there can be a column + // appearing multiple times in the PK. + if let Some(dist_key_in_pk) = &self.dist_key_in_pk { + let derived_dist_key = dist_key_in_pk + .iter() + .map(|idx| self.pk[*idx].column_index) + .collect_vec(); + assert_eq!( + derived_dist_key, distribution_key, + "dist_key mismatch with dist_key_in_pk" + ); + } + TableCatalog { id: TableId::placeholder(), associated_source_id: None, diff --git a/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs b/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs index 5b0aad3ea5586..9f7498830abd0 100644 --- a/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs +++ b/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs @@ -21,6 +21,19 @@ impl MigrationTrait for Migration { ) .await?; + // Fill vnode count with 1 for singleton tables. + manager + .exec_stmt( + UpdateStatement::new() + .table(Table::Table) + .values([(Table::VnodeCount, Expr::value(1))]) + .and_where(Expr::col(Table::DistributionKey).eq(Expr::value("[]"))) + .and_where(Expr::col(Table::DistKeyInPk).eq(Expr::value("[]"))) + .and_where(Expr::col(Table::VnodeColIndex).is_null()) + .to_owned(), + ) + .await?; + manager .alter_table( MigrationTable::alter() @@ -30,6 +43,17 @@ impl MigrationTrait for Migration { ) .await?; + // Fill vnode count with 1 for singleton fragments. + manager + .exec_stmt( + UpdateStatement::new() + .table(Fragment::Table) + .values([(Fragment::VnodeCount, Expr::value(1))]) + .and_where(Expr::col(Fragment::DistributionType).eq(Expr::value("SINGLE"))) + .to_owned(), + ) + .await?; + manager .alter_table( MigrationTable::alter() @@ -74,12 +98,16 @@ impl MigrationTrait for Migration { enum Fragment { Table, VnodeCount, + DistributionType, } #[derive(DeriveIden)] enum Table { Table, VnodeCount, + DistributionKey, + DistKeyInPk, + VnodeColIndex, } #[derive(DeriveIden)] diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 5d465b19d195d..97b5c3032117e 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -25,7 +25,7 @@ use either::Either; use enum_as_inner::EnumAsInner; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::{bail, hash}; use risingwave_meta_model::WorkerId; use risingwave_pb::common::{ActorInfo, WorkerNode}; @@ -152,11 +152,9 @@ impl Distribution { } /// Get the vnode count of the distribution. - /// - /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT, + Distribution::Singleton(_) => 1, // only `SINGLETON_VNODE` Distribution::Hash(mapping) => mapping.len(), } } diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 0aa5ebca9007f..65f7e55cdd4e6 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -103,9 +103,9 @@ impl ActorContext { fragment_id: stream_actor.fragment_id, mview_definition: stream_actor.mview_definition.clone(), vnode_count: (stream_actor.vnode_bitmap.as_ref()) - // An unset `vnode_bitmap` means the actor is a singleton. - // For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton. - .map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()), + // An unset `vnode_bitmap` means the actor is a singleton, + // where only `SINGLETON_VNODE` is set. + .map_or(1, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val, From 5cdb10e71a03801e92c1e48079ea989484e2c92a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 21 Oct 2024 17:09:10 +0800 Subject: [PATCH 2/9] fix compatibility Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/compat.rs | 126 ++++++++---------- 1 file changed, 55 insertions(+), 71 deletions(-) diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index ce2c75c47e923..6f15b323dcdfe 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -14,6 +14,8 @@ use std::num::NonZeroUsize; +use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; + use super::vnode::VirtualNode; /// The different cases of `maybe_vnode_count` field in the protobuf message. @@ -24,8 +26,12 @@ pub enum VnodeCount { Placeholder, /// The field is set to a specific value. Set(NonZeroUsize), - /// The field is unset because it's persisted in an older version. - Compat, + /// The field is unset because the table/fragment is persisted as hash-distributed + /// in an older version. + CompatHash, + /// The field is unset because the table/fragment is persisted as singleton + /// in an older version. + CompatSingleton, } impl VnodeCount { @@ -47,20 +53,9 @@ impl VnodeCount { /// Converts to protobuf representation for `maybe_vnode_count`. pub fn to_protobuf(self) -> Option { - match self { - VnodeCount::Placeholder => Some(0), - VnodeCount::Set(v) => Some(v.get() as _), - VnodeCount::Compat => None, - } - } - - /// Converts from protobuf representation of `maybe_vnode_count`. - pub fn from_protobuf(v: Option) -> Self { - match v { - Some(0) => VnodeCount::Placeholder, - Some(v) => VnodeCount::set(v as usize), - None => VnodeCount::Compat, - } + // Effectively fills the compatibility cases with values. + self.value_opt() + .map_or(Some(0) /* placeholder */, |v| Some(v as _)) } /// Returns the value of the vnode count, or `None` if it's a placeholder. @@ -68,7 +63,8 @@ impl VnodeCount { match self { VnodeCount::Placeholder => None, VnodeCount::Set(v) => Some(v.get()), - VnodeCount::Compat => Some(VirtualNode::COUNT_FOR_COMPAT), + VnodeCount::CompatHash => Some(VirtualNode::COUNT_FOR_COMPAT), + VnodeCount::CompatSingleton => Some(1), } } @@ -104,60 +100,48 @@ pub trait VnodeCountCompat { } } -/// Implement the trait for given types by delegating to the `maybe_vnode_count` field. -/// -/// The reason why there's a `maybe_` prefix is that, a getter method with the same name -/// as the field will be generated for `prost` structs. Directly naming it `vnode_count` -/// will lead to the method `vnode_count()` returning `0` when the field is unset, which -/// can be misleading sometimes. -/// -/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count` -/// through this trait, ensuring that backward compatibility is handled properly. -macro_rules! impl_maybe_vnode_count_compat { - ($($ty:ty),* $(,)?) => { - $( - impl VnodeCountCompat for $ty { - fn vnode_count_inner(&self) -> VnodeCount { - VnodeCount::from_protobuf(self.maybe_vnode_count) - } - } - )* - }; +impl VnodeCountCompat for risingwave_pb::catalog::Table { + fn vnode_count_inner(&self) -> VnodeCount { + if let Some(vnode_count) = self.maybe_vnode_count { + VnodeCount::set(vnode_count) + } else + // Compatibility: derive vnode count from distribution. + if self.distribution_key.is_empty() + && self.dist_key_in_pk.is_empty() + && self.vnode_col_index.is_none() + { + VnodeCount::CompatSingleton + } else { + VnodeCount::CompatHash + } + } } -// TODO!!!!!!!!!!!!!!!!! - -// impl VnodeCountCompat for risingwave_pb::catalog::Table { -// fn vnode_count(&self) -> usize { -// if let Some(vnode_count) = self.maybe_vnode_count { -// return vnode_count as _; -// } - -// // Compatibility: derive vnode count from distribution. -// if self.distribution_key.is_empty() -// && self.dist_key_in_pk.is_empty() -// && self.vnode_col_index.is_none() -// { -// // Singleton table. -// 1 -// } else { -// VirtualNode::COUNT_FOR_COMPAT -// } -// } -// } - -// impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { -// fn vnode_count(&self) -> usize { -// if let Some(vnode_count) = self.maybe_vnode_count { -// return vnode_count as _; -// } +impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { + fn vnode_count_inner(&self) -> VnodeCount { + if let Some(vnode_count) = self.maybe_vnode_count { + VnodeCount::set(vnode_count) + } else + // Compatibility: derive vnode count from distribution. + if self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() { + VnodeCount::CompatSingleton + } else { + VnodeCount::CompatHash + } + } +} -// // Compatibility: derive vnode count from distribution. -// if self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() { -// // Singleton table. -// 1 -// } else { -// VirtualNode::COUNT_FOR_COMPAT -// } -// } -// } +impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment { + fn vnode_count_inner(&self) -> VnodeCount { + if let Some(vnode_count) = self.maybe_vnode_count { + VnodeCount::set(vnode_count) + } else { + // Compatibility: derive vnode count from distribution. + match self.distribution_type() { + FragmentDistributionType::Unspecified => unreachable!(), + FragmentDistributionType::Single => VnodeCount::CompatSingleton, + FragmentDistributionType::Hash => VnodeCount::CompatHash, + } + } + } +} From e19a9c56c45d3419b60f547ab06c0fc6d05b935a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 21 Oct 2024 18:04:44 +0800 Subject: [PATCH 3/9] correctly handle placeholder Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/compat.rs | 59 +++++++++---------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 6f15b323dcdfe..395e462d4ad31 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -51,6 +51,23 @@ impl VnodeCount { Self::set(VirtualNode::COUNT_FOR_TEST) } + /// Converts from protobuf representation of `maybe_vnode_count`. If the value is not set, + /// call `compat_is_singleton` to determine whether it should be treated as a singleton + /// when it comes to backward compatibility. + fn from_protobuf(v: Option, compat_is_singleton: impl FnOnce() -> bool) -> Self { + match v { + Some(0) => VnodeCount::Placeholder, + Some(v) => VnodeCount::set(v as usize), + None => { + if compat_is_singleton() { + VnodeCount::CompatSingleton + } else { + VnodeCount::CompatHash + } + } + } + } + /// Converts to protobuf representation for `maybe_vnode_count`. pub fn to_protobuf(self) -> Option { // Effectively fills the compatibility cases with values. @@ -102,46 +119,26 @@ pub trait VnodeCountCompat { impl VnodeCountCompat for risingwave_pb::catalog::Table { fn vnode_count_inner(&self) -> VnodeCount { - if let Some(vnode_count) = self.maybe_vnode_count { - VnodeCount::set(vnode_count) - } else - // Compatibility: derive vnode count from distribution. - if self.distribution_key.is_empty() - && self.dist_key_in_pk.is_empty() - && self.vnode_col_index.is_none() - { - VnodeCount::CompatSingleton - } else { - VnodeCount::CompatHash - } + VnodeCount::from_protobuf(self.maybe_vnode_count, || { + self.distribution_key.is_empty() + && self.dist_key_in_pk.is_empty() + && self.vnode_col_index.is_none() + }) } } impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { fn vnode_count_inner(&self) -> VnodeCount { - if let Some(vnode_count) = self.maybe_vnode_count { - VnodeCount::set(vnode_count) - } else - // Compatibility: derive vnode count from distribution. - if self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() { - VnodeCount::CompatSingleton - } else { - VnodeCount::CompatHash - } + VnodeCount::from_protobuf(self.maybe_vnode_count, || { + self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() + }) } } impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment { fn vnode_count_inner(&self) -> VnodeCount { - if let Some(vnode_count) = self.maybe_vnode_count { - VnodeCount::set(vnode_count) - } else { - // Compatibility: derive vnode count from distribution. - match self.distribution_type() { - FragmentDistributionType::Unspecified => unreachable!(), - FragmentDistributionType::Single => VnodeCount::CompatSingleton, - FragmentDistributionType::Hash => VnodeCount::CompatHash, - } - } + VnodeCount::from_protobuf(self.maybe_vnode_count, || { + matches!(self.distribution_type(), FragmentDistributionType::Single) + }) } } From b5a8a25ecf0c9fe282cd4415ef591efcd2b2ba5c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 21 Oct 2024 18:04:55 +0800 Subject: [PATCH 4/9] tolerate vnode count assertion Signed-off-by: Bugen Zhao --- src/frontend/src/scheduler/plan_fragmenter.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 9bd1851383af6..b008da4efe4b0 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1228,8 +1228,16 @@ fn derive_partitions( table_desc: &TableDesc, vnode_mapping: &WorkerSlotMapping, ) -> SchedulerResult> { + let vnode_mapping = if table_desc.vnode_count != vnode_mapping.len() { + // The vnode count mismatch occurs only in special cases where a hash-distributed fragment + // contains singleton internal tables. e.g., the state table of `Source` executors. + // In this case, we reduce the vnode mapping to a single vnode as only `SINGLETON_VNODE` is used. + assert_eq!(table_desc.vnode_count, 1); + &WorkerSlotMapping::new_single(vnode_mapping.iter().next().unwrap()) + } else { + vnode_mapping + }; let vnode_count = vnode_mapping.len(); - assert_eq!(vnode_count, table_desc.vnode_count); let mut partitions: HashMap)> = HashMap::new(); From 60f7d3434a98e6b872ed4b6fcad6647e08caa649 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 22 Oct 2024 15:08:15 +0800 Subject: [PATCH 5/9] extract is_singleton Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/compat.rs | 42 ++++++++++++++----- src/common/src/hash/table_distribution.rs | 7 +++- src/stream/src/common/table/state_table.rs | 3 +- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 395e462d4ad31..0040a79f780fa 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -92,6 +92,17 @@ impl VnodeCount { } } +/// A trait for checking whether a table/fragment is a singleton. +pub trait IsSingleton { + /// Returns `true` if the table/fragment is a singleton. + /// + /// By singleton, we mean that all data read from or written to the storage belongs to + /// the only `SINGLETON_VNODE`. This must be consistent with the behavior of + /// [`TableDistribution`](crate::hash::table_distribution::TableDistribution::new). + /// As a result, the `vnode_count` of such table/fragment can be `1`. + fn is_singleton(&self) -> bool; +} + /// A trait for accessing the vnode count field with backward compatibility. /// /// # `maybe_`? @@ -117,28 +128,37 @@ pub trait VnodeCountCompat { } } +impl IsSingleton for risingwave_pb::catalog::Table { + fn is_singleton(&self) -> bool { + self.distribution_key.is_empty() + && self.dist_key_in_pk.is_empty() + && self.vnode_col_index.is_none() + } +} impl VnodeCountCompat for risingwave_pb::catalog::Table { fn vnode_count_inner(&self) -> VnodeCount { - VnodeCount::from_protobuf(self.maybe_vnode_count, || { - self.distribution_key.is_empty() - && self.dist_key_in_pk.is_empty() - && self.vnode_col_index.is_none() - }) + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) } } +impl IsSingleton for risingwave_pb::plan_common::StorageTableDesc { + fn is_singleton(&self) -> bool { + self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() + } +} impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc { fn vnode_count_inner(&self) -> VnodeCount { - VnodeCount::from_protobuf(self.maybe_vnode_count, || { - self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() - }) + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) } } +impl IsSingleton for risingwave_pb::meta::table_fragments::Fragment { + fn is_singleton(&self) -> bool { + matches!(self.distribution_type(), FragmentDistributionType::Single) + } +} impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment { fn vnode_count_inner(&self) -> VnodeCount { - VnodeCount::from_protobuf(self.maybe_vnode_count, || { - matches!(self.distribution_type(), FragmentDistributionType::Single) - }) + VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton()) } } diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 822db591c1577..a91f48f6589af 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use crate::array::{Array, DataChunk, PrimitiveArray}; use crate::bitmap::Bitmap; -use crate::hash::VirtualNode; +use crate::hash::{IsSingleton, VirtualNode}; use crate::row::Row; use crate::util::iter_util::ZipEqFast; @@ -64,7 +64,10 @@ impl TableDistribution { .map(|&k| k as usize) .collect_vec(); let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize); - Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk) + + let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); + assert_eq!(this.is_singleton(), table_desc.is_singleton()); + this } pub fn new( diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 3ada0afd71625..55957af9e6e5b 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -29,7 +29,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption, }; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::{IsSingleton, VirtualNode, VnodeBitmapExt}; use risingwave_common::row::{self, once, CompactedRow, Once, OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl}; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -363,6 +363,7 @@ where let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); + assert_eq!(distribution.is_singleton(), table_catalog.is_singleton()); let pk_data_types = pk_indices .iter() From bf17f823d3aaa4dcbff23975bc681fe3ac94e932 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 22 Oct 2024 15:43:33 +0800 Subject: [PATCH 6/9] set vnode count to 1 for singleton tables inside distributed fragments Signed-off-by: Bugen Zhao --- src/frontend/src/scheduler/plan_fragmenter.rs | 6 +++- src/meta/src/controller/streaming_job.rs | 35 ++++++++++++------- src/meta/src/model/stream.rs | 13 ++++++- src/meta/src/rpc/ddl_controller.rs | 6 ++-- src/meta/src/stream/stream_graph/actor.rs | 19 ++++++++-- src/meta/src/stream/stream_manager.rs | 2 +- 6 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index b008da4efe4b0..b55c3349b02c1 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1232,7 +1232,11 @@ fn derive_partitions( // The vnode count mismatch occurs only in special cases where a hash-distributed fragment // contains singleton internal tables. e.g., the state table of `Source` executors. // In this case, we reduce the vnode mapping to a single vnode as only `SINGLETON_VNODE` is used. - assert_eq!(table_desc.vnode_count, 1); + assert!( + table_desc.vnode_count == 1, + "fragment vnode count {} does not match table vnode count {}", + vnode_mapping.len(), table_desc.vnode_count, + ); &WorkerSlotMapping::new_single(vnode_mapping.iter().next().unwrap()) } else { vnode_mapping diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index bfcb7830ec6e9..1da4d95968f40 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::num::NonZeroUsize; use itertools::Itertools; +use risingwave_common::hash::VnodeCountCompat; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::{bail, current_cluster_version}; @@ -41,8 +42,7 @@ use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; use risingwave_pb::meta::{ - PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, PbTableFragments, Relation, - RelationGroup, + PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, Relation, RelationGroup, }; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -69,7 +69,7 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, StreamingJob}; -use crate::model::{StreamContext, TableParallelism}; +use crate::model::{StreamContext, TableFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -395,14 +395,18 @@ impl CatalogController { Ok(table_id_map) } + // TODO: In this function, we also update the `Table` model in the meta store. + // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider + // making them the source of truth and performing a full replacement for those in the meta store? pub async fn prepare_streaming_job( &self, - table_fragment: PbTableFragments, + table_fragments: &TableFragments, streaming_job: &StreamingJob, for_replace: bool, ) -> MetaResult<()> { let fragment_actors = - Self::extract_fragment_and_actors_from_table_fragments(table_fragment)?; + Self::extract_fragment_and_actors_from_table_fragments(table_fragments.to_protobuf())?; + let all_tables = table_fragments.all_tables(); let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -414,7 +418,6 @@ impl CatalogController { for fragment in fragments { let fragment_id = fragment.fragment_id; let state_table_ids = fragment.state_table_ids.inner_ref().clone(); - let vnode_count = fragment.vnode_count; let fragment = fragment.into_active_model(); Fragment::insert(fragment).exec(&txn).await?; @@ -423,10 +426,19 @@ impl CatalogController { // After table fragments are created, update them for all internal tables. if !for_replace { for state_table_id in state_table_ids { + // Table's vnode count is not always the fragment's vnode count, so we have to + // look up the table from `TableFragments`. + // See `ActorGraphBuilder::new`. + let table = all_tables + .get(&(state_table_id as u32)) + .unwrap_or_else(|| panic!("table {} not found", state_table_id)); + assert_eq!(table.fragment_id, fragment_id as u32); + let vnode_count = table.vnode_count(); + table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), - vnode_count: Set(vnode_count), + vnode_count: Set(vnode_count as _), ..Default::default() } .update(&txn) @@ -1045,25 +1057,24 @@ impl CatalogController { table.incoming_sinks = Set(incoming_sinks.into()); let table = table.update(txn).await?; - // Fields including `fragment_id` and `vnode_count` were placeholder values before. + // Fields including `fragment_id` were placeholder values before. // After table fragments are created, update them for all internal tables. - let fragment_info: Vec<(FragmentId, I32Array, i32)> = Fragment::find() + let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, fragment::Column::StateTableIds, - fragment::Column::VnodeCount, ]) .filter(fragment::Column::JobId.eq(dummy_id)) .into_tuple() .all(txn) .await?; - for (fragment_id, state_table_ids, vnode_count) in fragment_info { + for (fragment_id, state_table_ids) in fragment_info { for state_table_id in state_table_ids.into_inner() { table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), - vnode_count: Set(vnode_count), + // No need to update `vnode_count` because it must remain the same. ..Default::default() } .update(txn) diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index e956d238a84cc..492e67bdce557 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -576,10 +576,21 @@ impl TableFragments { /// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`], /// the table catalogs returned here are complete, with all fields filled. pub fn internal_tables(&self) -> BTreeMap { + self.collect_tables_inner(true) + } + + /// `internal_tables()` with additional table in `Materialize` node. + pub fn all_tables(&self) -> BTreeMap { + self.collect_tables_inner(false) + } + + fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { - stream_graph_visitor::visit_stream_node_internal_tables( + stream_graph_visitor::visit_stream_node_tables_inner( &mut fragment.actors[0].nodes.clone().unwrap(), + internal_tables_only, + true, |table, _| { let table_id = table.id; tables diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index cec9efa323a46..d2c46b284bddf 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1114,7 +1114,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false) + .prepare_streaming_job(&table_fragments, streaming_job, false) .await?; // create streaming jobs. @@ -1257,7 +1257,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; self.stream_manager @@ -1465,7 +1465,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; self.stream_manager diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 3446a2661d962..d7e1b0b1b6df7 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -20,7 +20,7 @@ use assert_matches::assert_matches; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorId, ActorMapping, WorkerSlotId}; +use risingwave_common::hash::{ActorId, ActorMapping, IsSingleton, VnodeCount, WorkerSlotId}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor::visit_tables; use risingwave_meta_model::WorkerId; @@ -683,9 +683,22 @@ impl ActorGraphBuilder { // Fill the vnode count for each internal table, based on schedule result. let mut fragment_graph = fragment_graph; for (id, fragment) in fragment_graph.building_fragments_mut() { - let vnode_count = distributions[id].vnode_count(); + let fragment_vnode_count = distributions[id].vnode_count(); visit_tables(fragment, |table, _| { - table.maybe_vnode_count = Some(vnode_count as _); + // There are special cases where a hash-distributed fragment contains singleton + // internal tables, e.g., the state table of `Source` executors. + let vnode_count = if table.is_singleton() { + if fragment_vnode_count > 1 { + tracing::info!( + table.name, + "found singleton table in hash-distributed fragment" + ); + } + 1 + } else { + fragment_vnode_count + }; + table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(); }) } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 21a642f1be1c7..7bf4058c647cf 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -350,7 +350,7 @@ impl GlobalStreamManager { if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; let dummy_table_id = table_fragments.table_id(); From 0a9cb1089e40e6eaf7715c0cf0b62977ebd10543 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 22 Oct 2024 16:10:15 +0800 Subject: [PATCH 7/9] fix fmt Signed-off-by: Bugen Zhao --- src/frontend/src/scheduler/plan_fragmenter.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index b55c3349b02c1..90984750bc460 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1235,7 +1235,8 @@ fn derive_partitions( assert!( table_desc.vnode_count == 1, "fragment vnode count {} does not match table vnode count {}", - vnode_mapping.len(), table_desc.vnode_count, + vnode_mapping.len(), + table_desc.vnode_count, ); &WorkerSlotMapping::new_single(vnode_mapping.iter().next().unwrap()) } else { From 657cd117fbf147797a2e8137cd7cdfce656291c9 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 25 Oct 2024 14:30:57 +0800 Subject: [PATCH 8/9] move migration into a separate step Signed-off-by: Bugen Zhao --- src/meta/model/migration/src/lib.rs | 2 + .../m20240911_083152_variable_vnode_count.rs | 28 --------- .../m20241025_062548_singleton_vnode_count.rs | 57 +++++++++++++++++++ 3 files changed, 59 insertions(+), 28 deletions(-) create mode 100644 src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index 4ae39aea29198..0c8c244c0624e 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -23,6 +23,7 @@ mod m20240806_143329_add_rate_limit_to_source_catalog; mod m20240820_081248_add_time_travel_per_table_epoch; mod m20240911_083152_variable_vnode_count; mod m20241016_065621_hummock_gc_history; +mod m20241025_062548_singleton_vnode_count; pub struct Migrator; @@ -83,6 +84,7 @@ impl MigratorTrait for Migrator { Box::new(m20240820_081248_add_time_travel_per_table_epoch::Migration), Box::new(m20240911_083152_variable_vnode_count::Migration), Box::new(m20241016_065621_hummock_gc_history::Migration), + Box::new(m20241025_062548_singleton_vnode_count::Migration), ] } } diff --git a/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs b/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs index 9f7498830abd0..5b0aad3ea5586 100644 --- a/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs +++ b/src/meta/model/migration/src/m20240911_083152_variable_vnode_count.rs @@ -21,19 +21,6 @@ impl MigrationTrait for Migration { ) .await?; - // Fill vnode count with 1 for singleton tables. - manager - .exec_stmt( - UpdateStatement::new() - .table(Table::Table) - .values([(Table::VnodeCount, Expr::value(1))]) - .and_where(Expr::col(Table::DistributionKey).eq(Expr::value("[]"))) - .and_where(Expr::col(Table::DistKeyInPk).eq(Expr::value("[]"))) - .and_where(Expr::col(Table::VnodeColIndex).is_null()) - .to_owned(), - ) - .await?; - manager .alter_table( MigrationTable::alter() @@ -43,17 +30,6 @@ impl MigrationTrait for Migration { ) .await?; - // Fill vnode count with 1 for singleton fragments. - manager - .exec_stmt( - UpdateStatement::new() - .table(Fragment::Table) - .values([(Fragment::VnodeCount, Expr::value(1))]) - .and_where(Expr::col(Fragment::DistributionType).eq(Expr::value("SINGLE"))) - .to_owned(), - ) - .await?; - manager .alter_table( MigrationTable::alter() @@ -98,16 +74,12 @@ impl MigrationTrait for Migration { enum Fragment { Table, VnodeCount, - DistributionType, } #[derive(DeriveIden)] enum Table { Table, VnodeCount, - DistributionKey, - DistKeyInPk, - VnodeColIndex, } #[derive(DeriveIden)] diff --git a/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs b/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs new file mode 100644 index 0000000000000..eb276099e24bf --- /dev/null +++ b/src/meta/model/migration/src/m20241025_062548_singleton_vnode_count.rs @@ -0,0 +1,57 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Fill vnode count with 1 for singleton tables. + manager + .exec_stmt( + UpdateStatement::new() + .table(Table::Table) + .values([(Table::VnodeCount, Expr::value(1))]) + .and_where(Expr::col(Table::DistributionKey).eq(Expr::value("[]"))) + .and_where(Expr::col(Table::DistKeyInPk).eq(Expr::value("[]"))) + .and_where(Expr::col(Table::VnodeColIndex).is_null()) + .to_owned(), + ) + .await?; + + // Fill vnode count with 1 for singleton fragments. + manager + .exec_stmt( + UpdateStatement::new() + .table(Fragment::Table) + .values([(Fragment::VnodeCount, Expr::value(1))]) + .and_where(Expr::col(Fragment::DistributionType).eq(Expr::value("SINGLE"))) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> { + Err(DbErr::Migration( + "cannot rollback singleton vnode count migration".to_owned(), + ))? + } +} + +#[derive(DeriveIden)] +enum Fragment { + Table, + VnodeCount, + DistributionType, +} + +#[derive(DeriveIden)] +enum Table { + Table, + VnodeCount, + DistributionKey, + DistKeyInPk, + VnodeColIndex, +} From 20e8fb1a6ad714d68cbdf476e1cdc4c2921423bd Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 25 Oct 2024 14:38:42 +0800 Subject: [PATCH 9/9] rename module Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/mod.rs | 2 +- .../src/hash/consistent_hash/{compat.rs => vnode_count.rs} | 0 src/common/src/hash/mod.rs | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename src/common/src/hash/consistent_hash/{compat.rs => vnode_count.rs} (100%) diff --git a/src/common/src/hash/consistent_hash/mod.rs b/src/common/src/hash/consistent_hash/mod.rs index 98a970af5947d..5f42369248bab 100644 --- a/src/common/src/hash/consistent_hash/mod.rs +++ b/src/common/src/hash/consistent_hash/mod.rs @@ -13,6 +13,6 @@ // limitations under the License. pub mod bitmap; -pub mod compat; pub mod mapping; pub mod vnode; +pub mod vnode_count; diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/vnode_count.rs similarity index 100% rename from src/common/src/hash/consistent_hash/compat.rs rename to src/common/src/hash/consistent_hash/vnode_count.rs diff --git a/src/common/src/hash/mod.rs b/src/common/src/hash/mod.rs index e54376bed2d50..8ed8120671179 100644 --- a/src/common/src/hash/mod.rs +++ b/src/common/src/hash/mod.rs @@ -19,9 +19,9 @@ mod key_v2; pub mod table_distribution; pub use consistent_hash::bitmap::*; -pub use consistent_hash::compat::*; pub use consistent_hash::mapping::*; pub use consistent_hash::vnode::*; +pub use consistent_hash::vnode_count::*; pub use dispatcher::{calc_hash_key_kind, HashKeyDispatcher}; pub use key::{ Crc32HashCode, HashCode, HashKeyDe, HashKeySer, HeapNullBitmap, NullBitmap,