From e15294db41e2f7649d5860bdf225869f5da9a2ca Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 13 May 2024 22:26:43 +0900 Subject: [PATCH] feat: introduce `TableRouteCache` to `PartitionRuleManager` (#3922) * chore: add `CompositeTableRouteCacheRef` to `PartitionRuleManager` * chore: update comments * fix: add metrics for `get` * chore: apply suggestions from CR * chore: correct cache name * feat: implement `LayeredCacheRegistry` * fix: invalidate logical tables by physical table id * refactor: replace `CacheRegistry` with `LayeredCacheRegistry` * chore: update comments * chore: apply suggestions from CR * chore: fix fmt * refactor: use `TableRouteCache` instead * chore: apply suggestions from CR * chore: fix clippy --- Cargo.lock | 4 + src/cache/Cargo.toml | 3 + src/cache/src/error.rs | 44 +++ src/cache/src/lib.rs | 54 +++- src/catalog/src/kvbackend/manager.rs | 7 +- src/cmd/src/error.rs | 9 +- src/cmd/src/frontend.rs | 47 ++- src/cmd/src/standalone.rs | 45 ++- src/common/meta/src/cache.rs | 12 +- src/common/meta/src/cache/container.rs | 12 + src/common/meta/src/cache/registry.rs | 129 ++++++++- src/common/meta/src/cache/table.rs | 5 - .../src/cache/table/composite_table_route.rs | 274 ------------------ src/common/meta/src/metrics.rs | 7 + src/frontend/src/instance/builder.rs | 24 +- src/operator/Cargo.toml | 1 + src/operator/src/statement.rs | 4 +- src/operator/src/tests/partition_manager.rs | 21 +- src/partition/src/error.rs | 16 + src/partition/src/manager.rs | 64 +++- tests-integration/src/cluster.rs | 22 +- tests-integration/src/standalone.rs | 16 +- 22 files changed, 459 insertions(+), 361 deletions(-) create mode 100644 src/cache/src/error.rs delete mode 100644 src/common/meta/src/cache/table/composite_table_route.rs diff --git a/Cargo.lock b/Cargo.lock index d8285185d110..0b183afc2600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1223,8 +1223,11 @@ name = "cache" version = "0.7.2" dependencies = [ "catalog", + "common-error", + "common-macro", "common-meta", "moka", + "snafu 0.8.2", ] [[package]] @@ -6626,6 +6629,7 @@ dependencies = [ "meta-client", "meter-core", "meter-macros", + "moka", "object-store", "partition", "path-slash", diff --git a/src/cache/Cargo.toml b/src/cache/Cargo.toml index b372018c5822..07870fa904a5 100644 --- a/src/cache/Cargo.toml +++ b/src/cache/Cargo.toml @@ -6,5 +6,8 @@ license.workspace = true [dependencies] catalog.workspace = true +common-error.workspace = true +common-macro.workspace = true common-meta.workspace = true moka.workspace = true +snafu.workspace = true diff --git a/src/cache/src/error.rs b/src/cache/src/error.rs new file mode 100644 index 000000000000..646a82bc98ed --- /dev/null +++ b/src/cache/src/error.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to get cache from cache registry: {}", name))] + CacheRequired { + #[snafu(implicit)] + location: Location, + name: String, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::CacheRequired { .. } => StatusCode::Internal, + } + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} diff --git a/src/cache/src/lib.rs b/src/cache/src/lib.rs index dcef9632490e..85dc9c05f1f3 100644 --- a/src/cache/src/lib.rs +++ b/src/cache/src/lib.rs @@ -12,15 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod error; + use std::sync::Arc; use std::time::Duration; use catalog::kvbackend::new_table_cache; use common_meta::cache::{ - new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, CacheRegistryBuilder, + new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, + new_table_route_cache, CacheRegistry, CacheRegistryBuilder, LayeredCacheRegistryBuilder, }; use common_meta::kv_backend::KvBackendRef; use moka::future::CacheBuilder; +use snafu::OptionExt; + +use crate::error::Result; const DEFAULT_CACHE_MAX_CAPACITY: u64 = 65536; const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60); @@ -30,9 +36,9 @@ pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache"; pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache"; pub const TABLE_CACHE_NAME: &str = "table_cache"; pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache"; +pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache"; -// TODO(weny): Make the cache configurable. -pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistryBuilder { +pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry { // Builds table info cache let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) .time_to_live(DEFAULT_CACHE_TTL) @@ -55,16 +61,15 @@ pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistry kv_backend.clone(), )); - // Builds table cache + // Builds table route cache let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) .time_to_live(DEFAULT_CACHE_TTL) .time_to_idle(DEFAULT_CACHE_TTI) .build(); - let table_cache = Arc::new(new_table_cache( - TABLE_CACHE_NAME.to_string(), + let table_route_cache = Arc::new(new_table_route_cache( + TABLE_ROUTE_CACHE_NAME.to_string(), cache, - table_info_cache.clone(), - table_name_cache.clone(), + kv_backend.clone(), )); // Builds table flownode set cache @@ -81,6 +86,37 @@ pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistry CacheRegistryBuilder::default() .add_cache(table_info_cache) .add_cache(table_name_cache) - .add_cache(table_cache) + .add_cache(table_route_cache) .add_cache(table_flownode_set_cache) + .build() +} + +// TODO(weny): Make the cache configurable. +pub fn with_default_composite_cache_registry( + builder: LayeredCacheRegistryBuilder, +) -> Result { + let table_info_cache = builder.get().context(error::CacheRequiredSnafu { + name: TABLE_INFO_CACHE_NAME, + })?; + let table_name_cache = builder.get().context(error::CacheRequiredSnafu { + name: TABLE_NAME_CACHE_NAME, + })?; + + // Builds table cache + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); + let table_cache = Arc::new(new_table_cache( + TABLE_CACHE_NAME.to_string(), + cache, + table_info_cache, + table_name_cache, + )); + + let registry = CacheRegistryBuilder::default() + .add_cache(table_cache) + .build(); + + Ok(builder.add_cache_registry(registry)) } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 69af7eeec498..e7a4ef4be39c 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -22,6 +22,7 @@ use common_catalog::consts::{ }; use common_config::Mode; use common_error::ext::BoxedError; +use common_meta::cache::TableRouteCacheRef; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoValue; @@ -71,11 +72,15 @@ impl KvBackendCatalogManager { meta_client: Option>, backend: KvBackendRef, table_cache: TableCacheRef, + table_route_cache: TableRouteCacheRef, ) -> Arc { Arc::new_cyclic(|me| Self { mode, meta_client, - partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), + partition_manager: Arc::new(PartitionRuleManager::new( + backend.clone(), + table_route_cache, + )), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), system_catalog: SystemCatalog { catalog_manager: me.clone(), diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 6c800455ace5..8a3e23eaa38e 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -310,6 +310,13 @@ pub enum Error { location: Location, name: String, }, + + #[snafu(display("Failed to build cache registry"))] + BuildCacheRegistry { + #[snafu(implicit)] + location: Location, + source: cache::error::Error, + }, } pub type Result = std::result::Result; @@ -359,7 +366,7 @@ impl ErrorExt for Error { Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected, - Error::CacheRequired { .. } => StatusCode::Internal, + Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal, Error::Other { source, .. } => source.status_code(), diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index b507e030b2f9..7cc2a2accabb 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -16,10 +16,14 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use cache::{default_cache_registry_builder, TABLE_CACHE_NAME}; +use cache::{ + build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, + TABLE_ROUTE_CACHE_NAME, +}; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; use client::client_manager::DatanodeClients; +use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_telemetry::info; @@ -248,27 +252,48 @@ impl StartCommand { .cache_tti(cache_tti) .build(); let cached_meta_backend = Arc::new(cached_meta_backend); - let cache_registry_builder = - default_cache_registry_builder(Arc::new(MetaKvBackend::new(meta_client.clone()))); - let cache_registry = Arc::new( - cache_registry_builder + + // Builds cache registry + let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( + CacheRegistryBuilder::default() .add_cache(cached_meta_backend.clone()) .build(), ); - let table_cache = cache_registry.get().context(error::CacheRequiredSnafu { - name: TABLE_CACHE_NAME, - })?; + let fundamental_cache_registry = + build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); + let layered_cache_registry = Arc::new( + with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .context(error::BuildCacheRegistrySnafu)? + .build(), + ); + + let table_cache = layered_cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_CACHE_NAME, + })?; + let table_route_cache = + layered_cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_ROUTE_CACHE_NAME, + })?; let catalog_manager = KvBackendCatalogManager::new( opts.mode, Some(meta_client.clone()), cached_meta_backend.clone(), table_cache, + table_route_cache, ) .await; let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())), + Arc::new(InvalidateTableCacheHandler::new( + layered_cache_registry.clone(), + )), ]); let heartbeat_task = HeartbeatTask::new( @@ -280,13 +305,13 @@ impl StartCommand { let mut instance = FrontendBuilder::new( cached_meta_backend.clone(), - cache_registry.clone(), + layered_cache_registry.clone(), catalog_manager, Arc::new(DatanodeClients::default()), meta_client, ) .with_plugin(plugins.clone()) - .with_local_cache_invalidator(cache_registry) + .with_local_cache_invalidator(layered_cache_registry) .with_heartbeat_task(heartbeat_task) .try_build() .await diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index cdfa1b17bdc7..22220e553ef8 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,11 +16,15 @@ use std::sync::Arc; use std::{fs, path}; use async_trait::async_trait; -use cache::{default_cache_registry_builder, TABLE_CACHE_NAME}; +use cache::{ + build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, + TABLE_ROUTE_CACHE_NAME, +}; use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, KvBackendConfig}; +use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; @@ -59,10 +63,10 @@ use servers::Mode; use snafu::{OptionExt, ResultExt}; use crate::error::{ - CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, - InitTimezoneSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, - StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, - StopProcedureManagerSnafu, + BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, + InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, Result, ShutdownDatanodeSnafu, + ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, Options}; use crate::App; @@ -388,12 +392,31 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let cache_registry = Arc::new(default_cache_registry_builder(kv_backend.clone()).build()); - let table_cache = cache_registry.get().context(CacheRequiredSnafu { + // Builds cache registry + let layered_cache_builder = LayeredCacheRegistryBuilder::default(); + let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone()); + let layered_cache_registry = Arc::new( + with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .context(BuildCacheRegistrySnafu)? + .build(), + ); + + let table_cache = layered_cache_registry.get().context(CacheRequiredSnafu { name: TABLE_CACHE_NAME, })?; - let catalog_manager = - KvBackendCatalogManager::new(dn_opts.mode, None, kv_backend.clone(), table_cache).await; + let table_route_cache = layered_cache_registry.get().context(CacheRequiredSnafu { + name: TABLE_ROUTE_CACHE_NAME, + })?; + let catalog_manager = KvBackendCatalogManager::new( + dn_opts.mode, + None, + kv_backend.clone(), + table_cache, + table_route_cache, + ) + .await; let builder = DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); @@ -431,7 +454,7 @@ impl StartCommand { let ddl_task_executor = Self::create_ddl_task_executor( procedure_manager.clone(), node_manager.clone(), - cache_registry.clone(), + layered_cache_registry.clone(), table_metadata_manager, table_meta_allocator, flow_metadata_manager, @@ -441,7 +464,7 @@ impl StartCommand { let mut frontend = FrontendBuilder::new( kv_backend, - cache_registry, + layered_cache_registry, catalog_manager, node_manager, ddl_task_executor, diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index ee8937e9b1be..b7d13a6f0ec0 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -19,10 +19,12 @@ mod table; pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter}; pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache, TableFlownodeSetCacheRef}; -pub use registry::{CacheRegistry, CacheRegistryBuilder, CacheRegistryRef}; +pub use registry::{ + CacheRegistry, CacheRegistryBuilder, CacheRegistryRef, LayeredCacheRegistry, + LayeredCacheRegistryBuilder, LayeredCacheRegistryRef, +}; pub use table::{ - new_composite_table_route_cache, new_table_info_cache, new_table_name_cache, - new_table_route_cache, CompositeTableRoute, CompositeTableRouteCache, - CompositeTableRouteCacheRef, TableInfoCache, TableInfoCacheRef, TableNameCache, - TableNameCacheRef, TableRouteCache, TableRouteCacheRef, + new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache, + TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, TableRouteCache, + TableRouteCacheRef, }; diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs index e0bf9c27043c..c32506534f90 100644 --- a/src/common/meta/src/cache/container.rs +++ b/src/common/meta/src/cache/container.rs @@ -101,9 +101,18 @@ where { /// Returns a _clone_ of the value corresponding to the key. pub async fn get(&self, key: K) -> Result> { + metrics::CACHE_CONTAINER_CACHE_GET + .with_label_values(&[&self.name]) + .inc(); let moved_init = self.initializer.clone(); let moved_key = key; let init = async move { + metrics::CACHE_CONTAINER_CACHE_MISS + .with_label_values(&[&self.name]) + .inc(); + let _timer = metrics::CACHE_CONTAINER_LOAD_CACHE + .with_label_values(&[&self.name]) + .start_timer(); moved_init(&moved_key) .await .transpose() @@ -163,6 +172,9 @@ where metrics::CACHE_CONTAINER_CACHE_MISS .with_label_values(&[&self.name]) .inc(); + let _timer = metrics::CACHE_CONTAINER_LOAD_CACHE + .with_label_values(&[&self.name]) + .start_timer(); moved_init(&moved_key) .await diff --git a/src/common/meta/src/cache/registry.rs b/src/common/meta/src/cache/registry.rs index 2d8940e70ed2..e51fb7e6732e 100644 --- a/src/common/meta/src/cache/registry.rs +++ b/src/common/meta/src/cache/registry.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use anymap2::SendSyncAnyMap; use futures::future::join_all; use crate::cache_invalidator::{CacheInvalidator, Context}; @@ -21,6 +22,65 @@ use crate::error::Result; use crate::instruction::CacheIdent; pub type CacheRegistryRef = Arc; +pub type LayeredCacheRegistryRef = Arc; + +/// [LayeredCacheRegistry] Builder. +#[derive(Default)] +pub struct LayeredCacheRegistryBuilder { + registry: LayeredCacheRegistry, +} + +impl LayeredCacheRegistryBuilder { + /// Adds [CacheRegistry] into the next layer. + /// + /// During cache invalidation, [LayeredCacheRegistry] ensures sequential invalidation + /// of each layer (after the previous layer). + pub fn add_cache_registry(mut self, registry: CacheRegistry) -> Self { + self.registry.layers.push(registry); + + self + } + + /// Returns __cloned__ the value stored in the collection for the type `T`, if it exists. + pub fn get(&self) -> Option { + self.registry.get() + } + + /// Builds the [LayeredCacheRegistry] + pub fn build(self) -> LayeredCacheRegistry { + self.registry + } +} + +/// [LayeredCacheRegistry] invalidate caches sequentially from the first layer. +#[derive(Default)] +pub struct LayeredCacheRegistry { + layers: Vec, +} + +#[async_trait::async_trait] +impl CacheInvalidator for LayeredCacheRegistry { + async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> { + let mut results = Vec::with_capacity(self.layers.len()); + for registry in &self.layers { + results.push(registry.invalidate(ctx, caches).await); + } + results.into_iter().collect::>>().map(|_| ()) + } +} + +impl LayeredCacheRegistry { + /// Returns __cloned__ the value stored in the collection for the type `T`, if it exists. + pub fn get(&self) -> Option { + for registry in &self.layers { + if let Some(cache) = registry.get::() { + return Some(cache); + } + } + + None + } +} /// [CacheRegistryBuilder] provides ability of /// - Register the `cache` which implements the [CacheInvalidator] trait into [CacheRegistry]. @@ -31,11 +91,13 @@ pub struct CacheRegistryBuilder { } impl CacheRegistryBuilder { + /// Adds the cache. pub fn add_cache(mut self, cache: Arc) -> Self { self.registry.register(cache); self } + /// Builds [CacheRegistry]. pub fn build(self) -> CacheRegistry { self.registry } @@ -46,7 +108,7 @@ impl CacheRegistryBuilder { #[derive(Default)] pub struct CacheRegistry { indexes: Vec>, - registry: anymap2::SendSyncAnyMap, + registry: SendSyncAnyMap, } #[async_trait::async_trait] @@ -80,7 +142,7 @@ impl CacheRegistry { #[cfg(test)] mod tests { - use std::sync::atomic::{AtomicI32, Ordering}; + use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Arc; use moka::future::{Cache, CacheBuilder}; @@ -89,7 +151,10 @@ mod tests { use crate::cache::*; use crate::instruction::CacheIdent; - fn test_cache(name: &str) -> CacheContainer { + fn test_cache( + name: &str, + invalidator: Invalidator, + ) -> CacheContainer { let cache: Cache = CacheBuilder::new(128).build(); let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); @@ -98,13 +163,14 @@ mod tests { moved_counter.fetch_add(1, Ordering::Relaxed); Box::pin(async { Ok(Some("hi".to_string())) }) }); - let invalidator: Invalidator = - Box::new(|_, _| Box::pin(async { Ok(()) })); CacheContainer::new(name.to_string(), cache, invalidator, init, filter) } - fn test_i32_cache(name: &str) -> CacheContainer { + fn test_i32_cache( + name: &str, + invalidator: Invalidator, + ) -> CacheContainer { let cache: Cache = CacheBuilder::new(128).build(); let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); @@ -113,8 +179,6 @@ mod tests { moved_counter.fetch_add(1, Ordering::Relaxed); Box::pin(async { Ok(Some("foo".to_string())) }) }); - let invalidator: Invalidator = - Box::new(|_, _| Box::pin(async { Ok(()) })); CacheContainer::new(name.to_string(), cache, invalidator, init, filter) } @@ -122,8 +186,12 @@ mod tests { #[tokio::test] async fn test_register() { let builder = CacheRegistryBuilder::default(); - let i32_cache = Arc::new(test_i32_cache("i32_cache")); - let cache = Arc::new(test_cache("string_cache")); + let invalidator: Invalidator<_, String, CacheIdent> = + Box::new(|_, _| Box::pin(async { Ok(()) })); + let i32_cache = Arc::new(test_i32_cache("i32_cache", invalidator)); + let invalidator: Invalidator<_, String, CacheIdent> = + Box::new(|_, _| Box::pin(async { Ok(()) })); + let cache = Arc::new(test_cache("string_cache", invalidator)); let registry = builder.add_cache(i32_cache).add_cache(cache).build(); let cache = registry @@ -136,4 +204,45 @@ mod tests { .unwrap(); assert_eq!(cache.name(), "string_cache"); } + + #[tokio::test] + async fn test_layered_registry() { + let builder = LayeredCacheRegistryBuilder::default(); + // 1st layer + let counter = Arc::new(AtomicBool::new(false)); + let moved_counter = counter.clone(); + let invalidator: Invalidator = Box::new(move |_, _| { + let counter = moved_counter.clone(); + Box::pin(async move { + assert!(!counter.load(Ordering::Relaxed)); + counter.store(true, Ordering::Relaxed); + Ok(()) + }) + }); + let cache = Arc::new(test_cache("string_cache", invalidator)); + let builder = + builder.add_cache_registry(CacheRegistryBuilder::default().add_cache(cache).build()); + // 2nd layer + let moved_counter = counter.clone(); + let invalidator: Invalidator = Box::new(move |_, _| { + let counter = moved_counter.clone(); + Box::pin(async move { + assert!(counter.load(Ordering::Relaxed)); + Ok(()) + }) + }); + let i32_cache = Arc::new(test_i32_cache("i32_cache", invalidator)); + let builder = builder + .add_cache_registry(CacheRegistryBuilder::default().add_cache(i32_cache).build()); + + let registry = builder.build(); + let cache = registry + .get::>>() + .unwrap(); + assert_eq!(cache.name(), "i32_cache"); + let cache = registry + .get::>>() + .unwrap(); + assert_eq!(cache.name(), "string_cache"); + } } diff --git a/src/common/meta/src/cache/table.rs b/src/common/meta/src/cache/table.rs index a595118c23f9..fa3bcbd30994 100644 --- a/src/common/meta/src/cache/table.rs +++ b/src/common/meta/src/cache/table.rs @@ -12,14 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod composite_table_route; mod table_info; mod table_name; mod table_route; -pub use composite_table_route::{ - new_composite_table_route_cache, CompositeTableRoute, CompositeTableRouteCache, - CompositeTableRouteCacheRef, -}; pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef}; pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef}; pub use table_route::{new_table_route_cache, TableRoute, TableRouteCache, TableRouteCacheRef}; diff --git a/src/common/meta/src/cache/table/composite_table_route.rs b/src/common/meta/src/cache/table/composite_table_route.rs deleted file mode 100644 index c213bdd42ae5..000000000000 --- a/src/common/meta/src/cache/table/composite_table_route.rs +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use futures::future::BoxFuture; -use moka::future::Cache; -use snafu::OptionExt; -use store_api::storage::TableId; - -use crate::cache::table::{TableRoute, TableRouteCacheRef}; -use crate::cache::{CacheContainer, Initializer}; -use crate::error; -use crate::error::Result; -use crate::instruction::CacheIdent; -use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue}; - -/// [CompositeTableRoute] stores all level routes of a table. -/// - Stores [PhysicalTableRouteValue] for logical table. -/// - Stores [LogicalTableRouteValue], [PhysicalTableRouteValue] for the logical table. -#[derive(Clone)] -pub enum CompositeTableRoute { - Physical(Arc), - Logical(Arc, Arc), -} - -impl CompositeTableRoute { - /// Returns true if it's physical table. - pub fn is_physical(&self) -> bool { - matches!(self, CompositeTableRoute::Physical(_)) - } - - /// Returns [PhysicalTableRouteValue] reference. - pub fn as_physical_table_route_ref(&self) -> &Arc { - match self { - CompositeTableRoute::Physical(route) => route, - CompositeTableRoute::Logical(_, route) => route, - } - } - - /// Returns [LogicalTableRouteValue] reference if it's [CompositeTableRoute::Logical]; Otherwise returns [None]. - pub fn as_logical_table_route_ref(&self) -> Option<&Arc> { - match self { - CompositeTableRoute::Physical(_) => None, - CompositeTableRoute::Logical(route, _) => Some(route), - } - } -} - -/// [CompositeTableRouteCache] caches the [TableId] to [CompositeTableRoute] mapping. -pub type CompositeTableRouteCache = CacheContainer, CacheIdent>; - -pub type CompositeTableRouteCacheRef = Arc; - -/// Constructs a [CompositeTableRouteCache]. -pub fn new_composite_table_route_cache( - name: String, - cache: Cache>, - table_route_cache: TableRouteCacheRef, -) -> CompositeTableRouteCache { - let init = init_factory(table_route_cache); - - CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) -} - -fn init_factory( - table_route_cache: TableRouteCacheRef, -) -> Initializer> { - Arc::new(move |table_id| { - let table_route_cache = table_route_cache.clone(); - Box::pin(async move { - let table_route_value = table_route_cache - .get(*table_id) - .await? - .context(error::ValueNotExistSnafu)?; - match table_route_value.as_ref() { - TableRoute::Physical(physical_table_route) => Ok(Some(Arc::new( - CompositeTableRoute::Physical(physical_table_route.clone()), - ))), - TableRoute::Logical(logical_table_route) => { - let physical_table_id = logical_table_route.physical_table_id(); - let physical_table_route = table_route_cache - .get(physical_table_id) - .await? - .context(error::ValueNotExistSnafu)?; - - let physical_table_route = physical_table_route - .as_physical_table_route_ref() - .with_context(|| error::UnexpectedSnafu { - err_msg: format!( - "Expected the physical table route, but got logical table route, table: {table_id}" - ), - })?; - - Ok(Some(Arc::new(CompositeTableRoute::Logical( - logical_table_route.clone(), - physical_table_route.clone(), - )))) - } - } - }) - }) -} - -fn invalidator<'a>( - cache: &'a Cache>, - ident: &'a CacheIdent, -) -> BoxFuture<'a, Result<()>> { - Box::pin(async move { - if let CacheIdent::TableId(table_id) = ident { - cache.invalidate(table_id).await - } - Ok(()) - }) -} - -fn filter(ident: &CacheIdent) -> bool { - matches!(ident, CacheIdent::TableId(_)) -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use moka::future::CacheBuilder; - use store_api::storage::RegionId; - - use super::*; - use crate::cache::new_table_route_cache; - use crate::ddl::test_util::create_table::test_create_table_task; - use crate::ddl::test_util::test_create_logical_table_task; - use crate::key::table_route::TableRouteValue; - use crate::key::TableMetadataManager; - use crate::kv_backend::memory::MemoryKvBackend; - use crate::peer::Peer; - use crate::rpc::router::{Region, RegionRoute}; - - #[tokio::test] - async fn test_cache_with_physical_table_route() { - let mem_kv = Arc::new(MemoryKvBackend::default()); - let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); - let cache = CacheBuilder::new(128).build(); - let table_route_cache = Arc::new(new_table_route_cache( - "test".to_string(), - cache, - mem_kv.clone(), - )); - let cache = CacheBuilder::new(128).build(); - let cache = - new_composite_table_route_cache("test".to_string(), cache, table_route_cache.clone()); - - let result = cache.get(1024).await.unwrap(); - assert!(result.is_none()); - let task = test_create_table_task("my_table", 1024); - let table_id = 10; - let region_id = RegionId::new(table_id, 1); - let peer = Peer::empty(1); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - table_metadata_manager - .create_table_metadata( - task.table_info.clone(), - TableRouteValue::physical(region_routes.clone()), - HashMap::new(), - ) - .await - .unwrap(); - let table_route = cache.get(1024).await.unwrap().unwrap(); - assert_eq!( - (*table_route) - .clone() - .as_physical_table_route_ref() - .region_routes, - region_routes - ); - - assert!(table_route_cache.contains_key(&1024)); - assert!(cache.contains_key(&1024)); - cache - .invalidate(&[CacheIdent::TableId(1024)]) - .await - .unwrap(); - assert!(!cache.contains_key(&1024)); - } - - #[tokio::test] - async fn test_cache_with_logical_table_route() { - let mem_kv = Arc::new(MemoryKvBackend::default()); - let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); - let cache = CacheBuilder::new(128).build(); - let table_route_cache = Arc::new(new_table_route_cache( - "test".to_string(), - cache, - mem_kv.clone(), - )); - let cache = CacheBuilder::new(128).build(); - let cache = - new_composite_table_route_cache("test".to_string(), cache, table_route_cache.clone()); - - let result = cache.get(1024).await.unwrap(); - assert!(result.is_none()); - // Prepares table routes - let task = test_create_table_task("my_table", 1024); - let table_id = 10; - let region_id = RegionId::new(table_id, 1); - let peer = Peer::empty(1); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(peer.clone()), - ..Default::default() - }]; - table_metadata_manager - .create_table_metadata( - task.table_info.clone(), - TableRouteValue::physical(region_routes.clone()), - HashMap::new(), - ) - .await - .unwrap(); - let mut task = test_create_logical_table_task("logical"); - task.table_info.ident.table_id = 1025; - table_metadata_manager - .create_logical_tables_metadata(vec![( - task.table_info, - TableRouteValue::logical(1024, vec![RegionId::new(1025, 0)]), - )]) - .await - .unwrap(); - - // Gets logical table route - let table_route = cache.get(1025).await.unwrap().unwrap(); - assert_eq!( - table_route - .as_logical_table_route_ref() - .unwrap() - .physical_table_id(), - 1024 - ); - assert_eq!( - table_route.as_physical_table_route_ref().region_routes, - region_routes - ); - - assert!(!cache.contains_key(&1024)); - // Gets physical table route - let table_route = cache.get(1024).await.unwrap().unwrap(); - assert_eq!( - table_route.as_physical_table_route_ref().region_routes, - region_routes - ); - assert!(table_route.is_physical()); - - cache - .invalidate(&[CacheIdent::TableId(1025)]) - .await - .unwrap(); - assert!(!cache.contains_key(&1025)); - } -} diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 7ca4b190728b..726b4fd474c5 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -89,4 +89,11 @@ lazy_static! { &["name"] ) .unwrap(); + /// Cache container load cache timer + pub static ref CACHE_CONTAINER_LOAD_CACHE: HistogramVec = register_histogram_vec!( + "greptime_meta_cache_container_load_cache", + "cache container load cache", + &["name"] + ) + .unwrap(); } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index e44ee680c5c6..f0993458a6aa 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -14,10 +14,10 @@ use std::sync::Arc; -use cache::TABLE_FLOWNODE_SET_CACHE_NAME; +use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use catalog::CatalogManagerRef; use common_base::Plugins; -use common_meta::cache::CacheRegistryRef; +use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef}; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManager; @@ -43,7 +43,7 @@ use crate::script::ScriptExecutor; /// The frontend [`Instance`] builder. pub struct FrontendBuilder { kv_backend: KvBackendRef, - cache_registry: CacheRegistryRef, + layered_cache_registry: LayeredCacheRegistryRef, local_cache_invalidator: Option, catalog_manager: CatalogManagerRef, node_manager: NodeManagerRef, @@ -55,14 +55,14 @@ pub struct FrontendBuilder { impl FrontendBuilder { pub fn new( kv_backend: KvBackendRef, - cache_registry: CacheRegistryRef, + layered_cache_registry: LayeredCacheRegistryRef, catalog_manager: CatalogManagerRef, node_manager: NodeManagerRef, procedure_executor: ProcedureExecutorRef, ) -> Self { Self { kv_backend, - cache_registry, + layered_cache_registry, local_cache_invalidator: None, catalog_manager, node_manager, @@ -98,7 +98,16 @@ impl FrontendBuilder { let node_manager = self.node_manager; let plugins = self.plugins.unwrap_or_default(); - let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); + let table_route_cache: TableRouteCacheRef = + self.layered_cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_ROUTE_CACHE_NAME, + })?; + let partition_manager = Arc::new(PartitionRuleManager::new( + kv_backend.clone(), + table_route_cache.clone(), + )); let local_cache_invalidator = self .local_cache_invalidator @@ -108,7 +117,7 @@ impl FrontendBuilder { FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone()); let table_flownode_cache = - self.cache_registry + self.layered_cache_registry .get() .context(error::CacheRequiredSnafu { name: TABLE_FLOWNODE_SET_CACHE_NAME, @@ -160,6 +169,7 @@ impl FrontendBuilder { kv_backend.clone(), local_cache_invalidator, inserter.clone(), + table_route_cache, )); plugins.insert::(statement_executor.clone()); diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index 83e0892a492c..aacbfeb685de 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -40,6 +40,7 @@ lazy_static.workspace = true meta-client.workspace = true meter-core.workspace = true meter-macros.workspace = true +moka.workspace = true object-store.workspace = true partition.workspace = true prometheus.workspace = true diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index a3cfe7fc41cf..beed26fc90be 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; +use common_meta::cache::TableRouteCacheRef; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; @@ -80,6 +81,7 @@ impl StatementExecutor { kv_backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, + table_route_cache: TableRouteCacheRef, ) -> Self { Self { catalog_manager, @@ -87,7 +89,7 @@ impl StatementExecutor { procedure_executor, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())), - partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)), + partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)), cache_invalidator, inserter, } diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index 0980a914c7e0..4a30928dc8c5 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use catalog::kvbackend::MetaKvBackend; +use common_meta::cache::{new_table_route_cache, TableRouteCacheRef}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -28,6 +29,7 @@ use datafusion_expr::{lit, Operator}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use meta_client::client::MetaClient; +use moka::future::CacheBuilder; use partition::columns::RangeColumnsPartitionRule; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; @@ -81,6 +83,15 @@ fn new_test_region_wal_options(regions: Vec) -> HashMap TableRouteCacheRef { + let cache = CacheBuilder::new(128).build(); + Arc::new(new_table_route_cache( + "table_route_cache".to_string(), + cache, + kv_backend.clone(), + )) +} + /// Create a partition rule manager with two tables, one is partitioned by single column, and /// the other one is two. The tables are under default catalog and schema. /// @@ -101,7 +112,8 @@ pub(crate) async fn create_partition_rule_manager( kv_backend: KvBackendRef, ) -> PartitionRuleManagerRef { let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); - let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend)); + let table_route_cache = test_new_table_route_cache(kv_backend.clone()); + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)); let regions = vec![1u32, 2, 3]; let region_wal_options = new_test_region_wal_options(regions.clone()); @@ -244,10 +256,11 @@ async fn test_find_partition_rule() { #[tokio::test(flavor = "multi_thread")] async fn test_find_regions() { - let kv_backend = MetaKvBackend { + let kv_backend = Arc::new(MetaKvBackend { client: Arc::new(MetaClient::default()), - }; - let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(kv_backend))); + }); + let table_route_cache = test_new_table_route_cache(kv_backend.clone()); + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)); // PARTITION BY RANGE (a) ( // PARTITION r1 VALUES LESS THAN (10), diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 921c29429f1c..f5a245dba335 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -29,6 +29,13 @@ use crate::expr::PartitionExpr; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to find table route: {}", table_id))] + TableRouteNotFound { + table_id: TableId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Table route manager error"))] TableRouteManager { source: common_meta::error::Error, @@ -185,6 +192,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Unexpected: {err_msg}"))] + Unexpected { + err_msg: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -203,9 +217,11 @@ impl ErrorExt for Error { | Error::InvalidInsertRequest { .. } | Error::InvalidDeleteRequest { .. } => StatusCode::InvalidArguments, Error::SerializeJson { .. } | Error::DeserializeJson { .. } => StatusCode::Internal, + Error::Unexpected { .. } => StatusCode::Unexpected, Error::InvalidTableRouteData { .. } => StatusCode::Internal, Error::ConvertScalarValue { .. } => StatusCode::Internal, Error::FindDatanode { .. } => StatusCode::InvalidArguments, + Error::TableRouteNotFound { .. } => StatusCode::TableNotFound, Error::TableRouteManager { source, .. } => source.status_code(), Error::MissingDefaultValue { .. } => StatusCode::Internal, Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(), diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 34f67c95a804..3d192fc97700 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -16,11 +16,11 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::Rows; -use common_meta::key::table_route::TableRouteManager; +use common_meta::cache::{TableRoute, TableRouteCacheRef}; +use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager}; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; -use common_meta::rpc::router; -use common_meta::rpc::router::RegionRoute; +use common_meta::rpc::router::{self, RegionRoute}; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; @@ -51,6 +51,7 @@ pub type PartitionRuleManagerRef = Arc; /// - filters (in case of select, deletion and update) pub struct PartitionRuleManager { table_route_manager: TableRouteManager, + table_route_cache: TableRouteCacheRef, } #[derive(Debug)] @@ -60,19 +61,46 @@ pub struct PartitionInfo { } impl PartitionRuleManager { - pub fn new(kv_backend: KvBackendRef) -> Self { + pub fn new(kv_backend: KvBackendRef, table_route_cache: TableRouteCacheRef) -> Self { Self { table_route_manager: TableRouteManager::new(kv_backend), + table_route_cache, } } - pub async fn find_region_routes(&self, table_id: TableId) -> Result> { - let (_, route) = self - .table_route_manager - .get_physical_table_route(table_id) + pub async fn find_physical_table_route( + &self, + table_id: TableId, + ) -> Result> { + match self + .table_route_cache + .get(table_id) .await - .context(error::TableRouteManagerSnafu)?; - Ok(route.region_routes) + .context(error::TableRouteManagerSnafu)? + .context(error::TableRouteNotFoundSnafu { table_id })? + .as_ref() + { + TableRoute::Physical(physical_table_route) => Ok(physical_table_route.clone()), + TableRoute::Logical(logical_table_route) => { + let physical_table_id = logical_table_route.physical_table_id(); + let physical_table_route = self + .table_route_cache + .get(physical_table_id) + .await + .context(error::TableRouteManagerSnafu)? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + let physical_table_route = physical_table_route + .as_physical_table_route_ref() + .context(error::UnexpectedSnafu{ + err_msg: format!( + "Expected the physical table route, but got logical table route, table: {table_id}" + ), + })?; + + Ok(physical_table_route.clone()) + } + } } pub async fn batch_find_region_routes( @@ -96,7 +124,10 @@ impl PartitionRuleManager { } pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { - let region_routes = self.find_region_routes(table_id).await?; + let region_routes = &self + .find_physical_table_route(table_id) + .await? + .region_routes; ensure!( !region_routes.is_empty(), error::FindTableRoutesSnafu { table_id } @@ -116,7 +147,7 @@ impl PartitionRuleManager { for (table_id, region_routes) in batch_region_routes { results.insert( table_id, - create_partitions_from_region_routes(table_id, region_routes)?, + create_partitions_from_region_routes(table_id, ®ion_routes)?, ); } @@ -228,9 +259,12 @@ impl PartitionRuleManager { } pub async fn find_region_leader(&self, region_id: RegionId) -> Result { - let region_routes = self.find_region_routes(region_id.table_id()).await?; + let region_routes = &self + .find_physical_table_route(region_id.table_id()) + .await? + .region_routes; - router::find_region_leader(®ion_routes, region_id.region_number()).context( + router::find_region_leader(region_routes, region_id.region_number()).context( FindLeaderSnafu { region_id, table_id: region_id.table_id(), @@ -250,7 +284,7 @@ impl PartitionRuleManager { fn create_partitions_from_region_routes( table_id: TableId, - region_routes: Vec, + region_routes: &[RegionRoute], ) -> Result> { let mut partitions = Vec::with_capacity(region_routes.len()); for r in region_routes { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 0536cd1d12c9..4131d36b30d0 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -19,12 +19,13 @@ use std::time::Duration; use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; -use cache::default_cache_registry_builder; +use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::kv_backend::chroot::ChrootKvBackend; @@ -346,19 +347,30 @@ impl GreptimeDbClusterBuilder { let cached_meta_backend = Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); - let cache_registry_builder = - default_cache_registry_builder(Arc::new(MetaKvBackend::new(meta_client.clone()))); - let cache_registry = Arc::new( - cache_registry_builder + + let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( + CacheRegistryBuilder::default() .add_cache(cached_meta_backend.clone()) .build(), ); + let fundamental_cache_registry = + build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); + let cache_registry = Arc::new( + with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .unwrap() + .build(), + ); + let table_cache = cache_registry.get().unwrap(); + let table_route_cache = cache_registry.get().unwrap(); let catalog_manager = KvBackendCatalogManager::new( Mode::Distributed, Some(meta_client.clone()), cached_meta_backend.clone(), table_cache, + table_route_cache, ) .await; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 661d4e037689..4d99c9744fab 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -14,12 +14,13 @@ use std::sync::Arc; -use cache::default_cache_registry_builder; +use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::kvbackend::KvBackendCatalogManager; use cmd::standalone::StandaloneOptions; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::KvBackendConfig; +use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::DdlContext; @@ -128,12 +129,23 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); - let cache_registry = Arc::new(default_cache_registry_builder(kv_backend.clone()).build()); + + let layered_cache_builder = LayeredCacheRegistryBuilder::default(); + let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone()); + let cache_registry = Arc::new( + with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .unwrap() + .build(), + ); + let catalog_manager = KvBackendCatalogManager::new( Mode::Standalone, None, kv_backend.clone(), cache_registry.get().unwrap(), + cache_registry.get().unwrap(), ) .await;