Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce FlownodePeerCache #4254

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions src/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use std::time::Duration;

use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_view_info_cache, CacheRegistry, CacheRegistryBuilder,
LayeredCacheRegistryBuilder,
new_flownode_peer_cache, new_table_flownode_set_cache, new_table_info_cache,
new_table_name_cache, new_table_route_cache, new_view_info_cache, CacheRegistry,
CacheRegistryBuilder, FlownodePeerCacheRef, LayeredCacheRegistryBuilder,
};
use common_meta::cluster::ClusterInfoRef;
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;
use snafu::OptionExt;
Expand All @@ -39,8 +40,9 @@ pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache";
pub const FLOWNODE_PEER_CACHE_NAME: &str = "flownode_peer_cache";

pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
fn default_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistryBuilder {
// Builds table info cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
Expand Down Expand Up @@ -101,7 +103,33 @@ pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegist
.add_cache(table_route_cache)
.add_cache(view_info_cache)
.add_cache(table_flownode_set_cache)
.build()
}

pub fn build_flownode_peer_cache(cluster_info: ClusterInfoRef) -> FlownodePeerCacheRef {
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
Arc::new(new_flownode_peer_cache(
FLOWNODE_PEER_CACHE_NAME.to_string(),
cache,
cluster_info,
))
}

pub fn build_fundamental_cache_registry_with<F>(
kv_backend: KvBackendRef,
applier: F,
) -> CacheRegistry
where
F: FnOnce(CacheRegistryBuilder) -> CacheRegistryBuilder,
{
let builder = default_fundamental_cache_registry(kv_backend);
applier(builder).build()
}

pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
default_fundamental_cache_registry(kv_backend).build()
}

// TODO(weny): Make the cache configurable.
Expand Down
12 changes: 9 additions & 3 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use cache::{
build_flownode_peer_cache, build_fundamental_cache_registry_with,
with_default_composite_cache_registry,
};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
Expand Down Expand Up @@ -297,8 +300,11 @@ impl StartCommand {
.add_cache(cached_meta_backend.clone())
.build(),
);
let fundamental_cache_registry =
build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
let cluster_info = meta_client.clone();
let fundamental_cache_registry = build_fundamental_cache_registry_with(
Arc::new(MetaKvBackend::new(meta_client.clone())),
move |builder| builder.add_cache(build_flownode_peer_cache(cluster_info)),
);
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod cluster;
mod container;
mod flow;
mod registry;
mod table;

pub use cluster::{new_flownode_peer_cache, FlownodePeerCache, FlownodePeerCacheRef};
pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter};
pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache, TableFlownodeSetCacheRef};
pub use registry::{
Expand Down
17 changes: 17 additions & 0 deletions src/common/meta/src/cache/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod flownode;

pub use flownode::{new_flownode_peer_cache, FlownodePeerCache, FlownodePeerCacheRef};
139 changes: 139 additions & 0 deletions src/common/meta/src/cache/cluster/flownode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures::future::BoxFuture;
use moka::future::Cache;
use snafu::{OptionExt, ResultExt};

use crate::cache::{CacheContainer, Initializer};
use crate::cluster::ClusterInfoRef;
use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::peer::Peer;
use crate::{error, FlownodeId};

/// [FlownodePeerCache] caches the [FlownodeId] to [Peer] mapping.
pub type FlownodePeerCache = CacheContainer<FlownodeId, Arc<Peer>, CacheIdent>;

pub type FlownodePeerCacheRef = Arc<FlownodePeerCache>;

/// Constructs a [FlownodePeerCache].
pub fn new_flownode_peer_cache(
name: String,
cache: Cache<FlownodeId, Arc<Peer>>,
cluster_info: ClusterInfoRef,
) -> FlownodePeerCache {
let init = init_factory(cluster_info);

CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}

fn init_factory(cluster_info: ClusterInfoRef) -> Initializer<FlownodeId, Arc<Peer>> {
Arc::new(move |flownode_id| {
let cluster_info = cluster_info.clone();
Box::pin(async move {
let peer = cluster_info
.get_flownode(*flownode_id)
.await
.context(error::GetClusterInfoSnafu)?
.context(error::ValueNotExistSnafu {})?;

Ok(Some(Arc::new(peer)))
})
})
}

fn invalidator<'a>(
cache: &'a Cache<FlownodeId, Arc<Peer>>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
if let CacheIdent::FlownodeId(flownode_id) = ident {
cache.invalidate(flownode_id).await
}
Ok(())
})
}

fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::FlownodeId(_))
}

#[cfg(test)]
mod tests {

use std::sync::Arc;

use common_error::ext::BoxedError;
use moka::future::CacheBuilder;

use crate::cache::new_flownode_peer_cache;
use crate::cluster::{ClusterInfo, NodeInfo, Role};
use crate::instruction::CacheIdent;
use crate::peer::Peer;
use crate::{error, FlownodeId};

struct MockClusterInfo {
error: bool,
}

#[async_trait::async_trait]
impl ClusterInfo for MockClusterInfo {
async fn list_nodes(
&self,
_role: Option<Role>,
) -> std::result::Result<Vec<NodeInfo>, BoxedError> {
unimplemented!()
}

async fn get_flownode(
&self,
id: FlownodeId,
) -> std::result::Result<Option<Peer>, BoxedError> {
if self.error {
return Err(BoxedError::new(
error::UnexpectedSnafu {
err_msg: "mock error".to_string(),
}
.build(),
));
}
Ok(Some(Peer {
id,
addr: format!("{}.flownode", id),
}))
}
}

#[tokio::test]
async fn test_get() {
let mock_cluster_info = Arc::new(MockClusterInfo { error: false });
let cache = CacheBuilder::new(128).build();
let cache = new_flownode_peer_cache("test".to_string(), cache, mock_cluster_info);
let peer_1 = cache.get(1).await.unwrap().unwrap();
assert_eq!(peer_1.id, 1);
let peer_2 = cache.get(2).await.unwrap().unwrap();
assert_eq!(peer_2.id, 2);
assert!(cache.contains_key(&1));
assert!(cache.contains_key(&2));
cache
.invalidate(&[CacheIdent::FlownodeId(1)])
.await
.unwrap();
assert!(!cache.contains_key(&1));
assert!(cache.contains_key(&2));
}
}
4 changes: 4 additions & 0 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ where
let key = FlowInfoKey::new(*flow_id);
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::FlownodeId(_) => {
// Do nothing.
// The Flownode peer info won't be cached in the KvBackend.
}
}
}
Ok(())
Expand Down
32 changes: 18 additions & 14 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

use std::str::FromStr;
use std::sync::Arc;

use common_error::ext::ErrorExt;
use common_error::ext::BoxedError;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
Expand All @@ -25,7 +26,7 @@ use crate::error::{
InvalidRoleSnafu, ParseNumSnafu, Result,
};
use crate::peer::Peer;
use crate::ClusterId;
use crate::{ClusterId, FlownodeId};

const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";

Expand All @@ -36,16 +37,19 @@ lazy_static! {
.unwrap();
}

pub type ClusterInfoRef = Arc<dyn ClusterInfo>;

/// [ClusterInfo] provides information about the cluster.
#[async_trait::async_trait]
pub trait ClusterInfo {
type Error: ErrorExt;

pub trait ClusterInfo: Sync + Send {
/// List all nodes by role in the cluster. If `role` is `None`, list all nodes.
async fn list_nodes(
&self,
role: Option<Role>,
) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
) -> std::result::Result<Vec<NodeInfo>, BoxedError>;

/// Retrieves the [`Peer`] information of a flownode.
async fn get_flownode(&self, id: FlownodeId) -> std::result::Result<Option<Peer>, BoxedError>;

// TODO(jeremy): Other info, like region status, etc.
}
Expand Down Expand Up @@ -184,11 +188,11 @@ impl FromStr for NodeInfoKey {
}
}

impl TryFrom<Vec<u8>> for NodeInfoKey {
impl TryFrom<&[u8]> for NodeInfoKey {
type Error = Error;

fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
fn try_from(bytes: &[u8]) -> Result<Self> {
std::str::from_utf8(bytes)
.context(FromUtf8Snafu {
name: "NodeInfoKey",
})
Expand Down Expand Up @@ -217,11 +221,11 @@ impl FromStr for NodeInfo {
}
}

impl TryFrom<Vec<u8>> for NodeInfo {
impl TryFrom<&[u8]> for NodeInfo {
type Error = Error;

fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
fn try_from(bytes: &[u8]) -> Result<Self> {
std::str::from_utf8(bytes)
.context(FromUtf8Snafu { name: "NodeInfo" })
.map(|x| x.parse())?
}
Expand Down Expand Up @@ -279,7 +283,7 @@ mod tests {
};

let key_bytes: Vec<u8> = key.into();
let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
let new_key: NodeInfoKey = key_bytes.as_slice().try_into().unwrap();

assert_eq!(1, new_key.cluster_id);
assert_eq!(Datanode, new_key.role);
Expand All @@ -306,7 +310,7 @@ mod tests {
};

let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
let new_node_info: NodeInfo = node_info_bytes.as_slice().try_into().unwrap();

assert_matches!(
new_node_info,
Expand Down
16 changes: 14 additions & 2 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ pub enum Error {
FromUtf8 {
name: String,
#[snafu(source)]
error: std::string::FromUtf8Error,
error: std::str::Utf8Error,
#[snafu(implicit)]
location: Location,
},
Expand All @@ -636,7 +636,18 @@ pub enum Error {
},

#[snafu(display("Failed to get cache"))]
GetCache { source: Arc<Error> },
GetCache {
source: Arc<Error>,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to get cluster info"))]
GetClusterInfo {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -723,6 +734,7 @@ impl ErrorExt for Error {
RetryLater { source, .. } => source.status_code(),
InvalidCatalogValue { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),
GetClusterInfo { source, .. } => source.status_code(),

ParseProcedureId { .. }
| InvalidNumTopics { .. }
Expand Down
Loading
Loading