diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 8d66cc3caabd..0dd4f6701744 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -94,6 +94,10 @@ impl FrontendCatalogManager { self.partition_manager.clone() } + pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + pub fn datanode_clients(&self) -> Arc { self.datanode_clients.clone() } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 8f2da8de7954..bf98c95484a4 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -25,7 +25,6 @@ use api::v1::{ FlushTableExpr, InsertRequests, TruncateTableExpr, }; use async_trait::async_trait; -use catalog::remote::CachedMetaKvBackend; use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; use chrono::DateTime; use client::client_manager::DatanodeClients; @@ -34,7 +33,6 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::peer::Peer; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest}; @@ -80,7 +78,6 @@ const MAX_VALUE: &str = "MAXVALUE"; pub struct DistInstance { meta_client: Arc, catalog_manager: Arc, - table_metadata_manager: TableMetadataManagerRef, datanode_clients: Arc, } @@ -90,13 +87,9 @@ impl DistInstance { catalog_manager: Arc, datanode_clients: Arc, ) -> Self { - let table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new( - CachedMetaKvBackend::new(meta_client.clone()), - ))); Self { meta_client, catalog_manager, - table_metadata_manager, datanode_clients, } } @@ -470,7 +463,8 @@ impl DistInstance { let schema = SchemaNameKey::new(catalog, &expr.database_name); let exist = self - .table_metadata_manager + .catalog_manager + .table_metadata_manager_ref() .schema_manager() .exist(schema) .await @@ -483,7 +477,8 @@ impl DistInstance { } ); - self.table_metadata_manager + self.catalog_manager + .table_metadata_manager_ref() .schema_manager() .create(schema) .await diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs index 38e5e9b6af04..d17d7d99f462 100644 --- a/src/meta-srv/src/metadata_service.rs +++ b/src/meta-srv/src/metadata_service.rs @@ -17,14 +17,13 @@ use std::sync::Arc; use async_trait::async_trait; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::key::TableMetadataManagerRef; use common_telemetry::{info, timer}; use metrics::increment_counter; use snafu::{ensure, ResultExt}; use crate::error; use crate::error::Result; -use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; /// This trait defines some methods of metadata #[async_trait] @@ -48,9 +47,7 @@ pub struct DefaultMetadataService { } impl DefaultMetadataService { - pub fn new(kv_store: KvStoreRef) -> Self { - let table_metadata_manager = - Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store))); + pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self { Self { table_metadata_manager, } @@ -114,16 +111,19 @@ mod tests { use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; - use common_meta::key::TableMetaKey; + use common_meta::key::{TableMetaKey, TableMetadataManager}; use super::{DefaultMetadataService, MetadataService}; - use crate::service::store::kv::KvStoreRef; + use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; use crate::service::store::memory::MemStore; #[tokio::test] async fn test_create_schema() { let kv_store = Arc::new(MemStore::default()); - let service = DefaultMetadataService::new(kv_store.clone()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))); + let service = DefaultMetadataService::new(table_metadata_manager); service .create_schema("catalog", "public", false) diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ebfbb5da319e..1e4acb88747e 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -166,8 +166,11 @@ impl MetaSrvBuilder { let mailbox = build_mailbox(&kv_store, &pushers); let procedure_manager = build_procedure_manager(&options, &kv_store); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))); let metadata_service = metadata_service - .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); + .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(table_metadata_manager))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let table_metadata_manager = build_table_metadata_manager(&kv_store); let ddl_manager = build_ddl_manager( diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index df901c5ea495..d7393ba04863 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -22,13 +22,14 @@ use api::v1::meta::store_server::StoreServer; use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::key::TableMetadataManager; use tower::service_fn; use crate::metadata_service::{DefaultMetadataService, MetadataService}; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use crate::service::store::etcd::EtcdStore; -use crate::service::store::kv::KvStoreRef; +use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; use crate::service::store::memory::MemStore; #[derive(Clone)] @@ -60,8 +61,10 @@ pub async fn mock( datanode_clients: Option>, ) -> MockInfo { let server_addr = opts.server_addr.clone(); - - let metadata_service = DefaultMetadataService::new(kv_store.clone()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))); + let metadata_service = DefaultMetadataService::new(table_metadata_manager); metadata_service .create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true) diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 87dbfa982426..e0366fa19d61 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -24,13 +24,10 @@ use std::convert::Infallible; use std::sync::Arc; use std::task::{Context, Poll}; -use common_meta::key::catalog_name::CatalogManager; -use common_meta::key::schema_name::SchemaManager; use tonic::body::BoxBody; use tonic::codegen::{empty_body, http, BoxFuture, Service}; use tonic::transport::NamedService; -use super::store::kv::KvBackendAdapter; use crate::metasrv::MetaSrv; pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { @@ -53,16 +50,14 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let router = router.route( "/catalogs", meta::CatalogsHandler { - catalog_manager: CatalogManager::new(KvBackendAdapter::wrap( - meta_srv.kv_store().clone(), - )), + table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); let router = router.route( "/schemas", meta::SchemasHandler { - schema_manager: SchemaManager::new(KvBackendAdapter::wrap(meta_srv.kv_store().clone())), + table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs index 96845eaefb0d..fbcc918ae406 100644 --- a/src/meta-srv/src/service/admin/meta.rs +++ b/src/meta-srv/src/service/admin/meta.rs @@ -15,8 +15,6 @@ use std::collections::HashMap; use common_error::ext::BoxedError; -use common_meta::key::catalog_name::CatalogManager; -use common_meta::key::schema_name::SchemaManager; use common_meta::key::table_name::TableNameKey; use common_meta::key::TableMetadataManagerRef; use futures::TryStreamExt; @@ -28,11 +26,11 @@ use crate::error::{Result, TableMetadataManagerSnafu}; use crate::service::admin::HttpHandler; pub struct CatalogsHandler { - pub catalog_manager: CatalogManager, + pub table_metadata_manager: TableMetadataManagerRef, } pub struct SchemasHandler { - pub schema_manager: SchemaManager, + pub table_metadata_manager: TableMetadataManagerRef, } pub struct TablesHandler { @@ -46,7 +44,11 @@ pub struct TableHandler { #[async_trait::async_trait] impl HttpHandler for CatalogsHandler { async fn handle(&self, _: &str, _: &HashMap) -> Result> { - let stream = self.catalog_manager.catalog_names().await; + let stream = self + .table_metadata_manager + .catalog_manager() + .catalog_names() + .await; let keys = stream .try_collect::>() @@ -70,7 +72,11 @@ impl HttpHandler for SchemasHandler { .context(error::MissingRequiredParameterSnafu { param: "catalog_name", })?; - let stream = self.schema_manager.schema_names(catalog).await; + let stream = self + .table_metadata_manager + .schema_manager() + .schema_names(catalog) + .await; let keys = stream .try_collect::>() diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index d25ca64340ca..8c42f1ca81d0 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -21,6 +21,7 @@ use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::ChannelManager; +use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; @@ -35,7 +36,7 @@ use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metadata_service::{DefaultMetadataService, MetadataService}; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; use meta_srv::mocks::MockInfo; -use meta_srv::service::store::kv::KvStoreRef; +use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef}; use meta_srv::service::store::memory::MemStore; use servers::grpc::GrpcServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; @@ -130,8 +131,10 @@ impl GreptimeDbClusterBuilder { let mock = meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await; - - let metadata_service = DefaultMetadataService::new(mock.meta_srv.kv_store().clone()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + mock.meta_srv.kv_store().clone(), + ))); + let metadata_service = DefaultMetadataService::new(table_metadata_manager); metadata_service .create_schema("another_catalog", "another_schema", true) .await