Skip to content

Commit

Permalink
refactor: use 1 for vnode count of singletons (#18753)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Oct 25, 2024
1 parent fcb6c35 commit 58ecec2
Show file tree
Hide file tree
Showing 22 changed files with 331 additions and 165 deletions.
2 changes: 1 addition & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ message Table {
// Use `VnodeCountCompat::vnode_count` to access it.
//
// - Can be unset if the table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the
// corresponding job is still in `Creating` status, in which case calling `vnode_count`
// will panic.
Expand Down
2 changes: 1 addition & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ message TableFragments {
// Duplicated from the length of the vnode bitmap in any actor of the fragment.
//
// Can be unset if the fragment is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ message StorageTableDesc {
// Total vnode count of the table.
//
// Can be unset if the table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 12;
}
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ impl Bitmap {
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
/// 1 and the only [`SINGLETON_VNODE`] set to true.
pub fn singleton() -> &'static Self {
Self::singleton_arc()
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
/// 1 and the only [`SINGLETON_VNODE`] set to true.
pub fn singleton_arc() -> &'static Arc<Self> {
static SINGLETON: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT);
let mut builder = BitmapBuilder::zeroed(1);
builder.set(SINGLETON_VNODE.to_index(), true);
builder.finish().into()
});
Expand Down
123 changes: 0 additions & 123 deletions src/common/src/hash/consistent_hash/compat.rs

This file was deleted.

7 changes: 4 additions & 3 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
}
}

/// Create a vnode mapping with the single item. Should only be used for singletons.
/// Create a vnode mapping with the single item and length of 1.
///
/// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count.
/// Should only be used for singletons. If you want a different vnode count, call
/// [`VnodeMapping::new_uniform`] with `std::iter::once(item)` and desired length.
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT)
Self::new_uniform(std::iter::once(item), 1)
}

/// The length (or count) of the vnode in this mapping.
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/hash/consistent_hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
// limitations under the License.

pub mod bitmap;
pub mod compat;
pub mod mapping;
pub mod vnode;
pub mod vnode_count;
164 changes: 164 additions & 0 deletions src/common/src/hash/consistent_hash/vnode_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroUsize;

use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;

use super::vnode::VirtualNode;

/// The different cases of `maybe_vnode_count` field in the protobuf message.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum VnodeCount {
/// The field is a placeholder and has to be filled first before using it.
#[default]
Placeholder,
/// The field is set to a specific value.
Set(NonZeroUsize),
/// The field is unset because the table/fragment is persisted as hash-distributed
/// in an older version.
CompatHash,
/// The field is unset because the table/fragment is persisted as singleton
/// in an older version.
CompatSingleton,
}

impl VnodeCount {
/// Creates a `VnodeCount` set to the given value.
pub fn set(v: impl TryInto<usize> + Copy + std::fmt::Debug) -> Self {
let v = (v.try_into().ok())
.filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v))
.unwrap_or_else(|| panic!("invalid vnode count {v:?}"));

VnodeCount::Set(NonZeroUsize::new(v).unwrap())
}

/// Creates a `VnodeCount` set to the value for testing.
///
/// Equivalent to `VnodeCount::set(VirtualNode::COUNT_FOR_TEST)`.
pub fn for_test() -> Self {
Self::set(VirtualNode::COUNT_FOR_TEST)
}

/// Converts from protobuf representation of `maybe_vnode_count`. If the value is not set,
/// call `compat_is_singleton` to determine whether it should be treated as a singleton
/// when it comes to backward compatibility.
fn from_protobuf(v: Option<u32>, compat_is_singleton: impl FnOnce() -> bool) -> Self {
match v {
Some(0) => VnodeCount::Placeholder,
Some(v) => VnodeCount::set(v as usize),
None => {
if compat_is_singleton() {
VnodeCount::CompatSingleton
} else {
VnodeCount::CompatHash
}
}
}
}

/// Converts to protobuf representation for `maybe_vnode_count`.
pub fn to_protobuf(self) -> Option<u32> {
// Effectively fills the compatibility cases with values.
self.value_opt()
.map_or(Some(0) /* placeholder */, |v| Some(v as _))
}

/// Returns the value of the vnode count, or `None` if it's a placeholder.
pub fn value_opt(self) -> Option<usize> {
match self {
VnodeCount::Placeholder => None,
VnodeCount::Set(v) => Some(v.get()),
VnodeCount::CompatHash => Some(VirtualNode::COUNT_FOR_COMPAT),
VnodeCount::CompatSingleton => Some(1),
}
}

/// Returns the value of the vnode count. Panics if it's a placeholder.
pub fn value(self) -> usize {
self.value_opt()
.expect("vnode count is a placeholder that must be filled by the meta service first")
}
}

/// A trait for checking whether a table/fragment is a singleton.
pub trait IsSingleton {
/// Returns `true` if the table/fragment is a singleton.
///
/// By singleton, we mean that all data read from or written to the storage belongs to
/// the only `SINGLETON_VNODE`. This must be consistent with the behavior of
/// [`TableDistribution`](crate::hash::table_distribution::TableDistribution::new).
/// As a result, the `vnode_count` of such table/fragment can be `1`.
fn is_singleton(&self) -> bool;
}

/// A trait for accessing the vnode count field with backward compatibility.
///
/// # `maybe_`?
///
/// The reason why there's a `maybe_` prefix on the protobuf field is that, a getter
/// method with the same name as the field will be generated for `prost` structs.
/// Directly naming it `vnode_count` will lead to the method `vnode_count()` returning
/// `0` when the field is unset, which can be misleading sometimes.
///
/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count`
/// through this trait, ensuring that backward compatibility is handled properly.
pub trait VnodeCountCompat {
/// Get the `maybe_vnode_count` field.
fn vnode_count_inner(&self) -> VnodeCount;

/// Returns the vnode count if it's set. Otherwise, returns [`VirtualNode::COUNT_FOR_COMPAT`]
/// for distributed tables/fragments, and `1` for singleton tables/fragments, for backward
/// compatibility. Panics if the field is a placeholder.
///
/// See the documentation on the field of the implementing type for more details.
fn vnode_count(&self) -> usize {
self.vnode_count_inner().value()
}
}

impl IsSingleton for risingwave_pb::catalog::Table {
fn is_singleton(&self) -> bool {
self.distribution_key.is_empty()
&& self.dist_key_in_pk.is_empty()
&& self.vnode_col_index.is_none()
}
}
impl VnodeCountCompat for risingwave_pb::catalog::Table {
fn vnode_count_inner(&self) -> VnodeCount {
VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
}
}

impl IsSingleton for risingwave_pb::plan_common::StorageTableDesc {
fn is_singleton(&self) -> bool {
self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none()
}
}
impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc {
fn vnode_count_inner(&self) -> VnodeCount {
VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
}
}

impl IsSingleton for risingwave_pb::meta::table_fragments::Fragment {
fn is_singleton(&self) -> bool {
matches!(self.distribution_type(), FragmentDistributionType::Single)
}
}
impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment {
fn vnode_count_inner(&self) -> VnodeCount {
VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
}
}
2 changes: 1 addition & 1 deletion src/common/src/hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ mod key_v2;
pub mod table_distribution;

pub use consistent_hash::bitmap::*;
pub use consistent_hash::compat::*;
pub use consistent_hash::mapping::*;
pub use consistent_hash::vnode::*;
pub use consistent_hash::vnode_count::*;
pub use dispatcher::{calc_hash_key_kind, HashKeyDispatcher};
pub use key::{
Crc32HashCode, HashCode, HashKeyDe, HashKeySer, HeapNullBitmap, NullBitmap,
Expand Down
7 changes: 5 additions & 2 deletions src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::StorageTableDesc;

use crate::array::{Array, DataChunk, PrimitiveArray};
use crate::bitmap::Bitmap;
use crate::hash::VirtualNode;
use crate::hash::{IsSingleton, VirtualNode};
use crate::row::Row;
use crate::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -64,7 +64,10 @@ impl TableDistribution {
.map(|&k| k as usize)
.collect_vec();
let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk)

let this = Self::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
assert_eq!(this.is_singleton(), table_desc.is_singleton());
this
}

pub fn new(
Expand Down
Loading

0 comments on commit 58ecec2

Please sign in to comment.