Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 10, 2023
1 parent bef057b commit 3458a0f
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 37 deletions.
4 changes: 4 additions & 0 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ impl FrontendCatalogManager {
self.partition_manager.clone()
}

pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}

pub fn datanode_clients(&self) -> Arc<DatanodeClients> {
self.datanode_clients.clone()
}
Expand Down
13 changes: 4 additions & 9 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use api::v1::{
FlushTableExpr, InsertRequests, TruncateTableExpr,
};
use async_trait::async_trait;
use catalog::remote::CachedMetaKvBackend;
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
use chrono::DateTime;
use client::client_manager::DatanodeClients;
Expand All @@ -34,7 +33,6 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest};
Expand Down Expand Up @@ -80,7 +78,6 @@ const MAX_VALUE: &str = "MAXVALUE";
pub struct DistInstance {
meta_client: Arc<MetaClient>,
catalog_manager: Arc<FrontendCatalogManager>,
table_metadata_manager: TableMetadataManagerRef,
datanode_clients: Arc<DatanodeClients>,
}

Expand All @@ -90,13 +87,9 @@ impl DistInstance {
catalog_manager: Arc<FrontendCatalogManager>,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
let table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(
CachedMetaKvBackend::new(meta_client.clone()),
)));
Self {
meta_client,
catalog_manager,
table_metadata_manager,
datanode_clients,
}
}
Expand Down Expand Up @@ -470,7 +463,8 @@ impl DistInstance {

let schema = SchemaNameKey::new(catalog, &expr.database_name);
let exist = self
.table_metadata_manager
.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.exist(schema)
.await
Expand All @@ -483,7 +477,8 @@ impl DistInstance {
}
);

self.table_metadata_manager
self.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.create(schema)
.await
Expand Down
16 changes: 8 additions & 8 deletions src/meta-srv/src/metadata_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::{info, timer};
use metrics::increment_counter;
use snafu::{ensure, ResultExt};

use crate::error;
use crate::error::Result;
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef};

/// This trait defines some methods of metadata
#[async_trait]
Expand All @@ -48,9 +47,7 @@ pub struct DefaultMetadataService {
}

impl DefaultMetadataService {
pub fn new(kv_store: KvStoreRef) -> Self {
let table_metadata_manager =
Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store)));
pub fn new(table_metadata_manager: TableMetadataManagerRef) -> Self {
Self {
table_metadata_manager,
}
Expand Down Expand Up @@ -114,16 +111,19 @@ mod tests {

use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::TableMetaKey;
use common_meta::key::{TableMetaKey, TableMetadataManager};

use super::{DefaultMetadataService, MetadataService};
use crate::service::store::kv::KvStoreRef;
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef};
use crate::service::store::memory::MemStore;

#[tokio::test]
async fn test_create_schema() {
let kv_store = Arc::new(MemStore::default());
let service = DefaultMetadataService::new(kv_store.clone());
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
let service = DefaultMetadataService::new(table_metadata_manager);

service
.create_schema("catalog", "public", false)
Expand Down
5 changes: 4 additions & 1 deletion src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,11 @@ impl MetaSrvBuilder {
let mailbox = build_mailbox(&kv_store, &pushers);
let procedure_manager = build_procedure_manager(&options, &kv_store);
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(table_metadata_manager)));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
let table_metadata_manager = build_table_metadata_manager(&kv_store);
let ddl_manager = build_ddl_manager(
Expand Down
9 changes: 6 additions & 3 deletions src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use api::v1::meta::store_server::StoreServer;
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::key::TableMetadataManager;
use tower::service_fn;

use crate::metadata_service::{DefaultMetadataService, MetadataService};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::service::store::etcd::EtcdStore;
use crate::service::store::kv::KvStoreRef;
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef};
use crate::service::store::memory::MemStore;

#[derive(Clone)]
Expand Down Expand Up @@ -60,8 +61,10 @@ pub async fn mock(
datanode_clients: Option<Arc<DatanodeClients>>,
) -> MockInfo {
let server_addr = opts.server_addr.clone();

let metadata_service = DefaultMetadataService::new(kv_store.clone());
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
let metadata_service = DefaultMetadataService::new(table_metadata_manager);

metadata_service
.create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true)
Expand Down
9 changes: 2 additions & 7 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@ use std::convert::Infallible;
use std::sync::Arc;
use std::task::{Context, Poll};

use common_meta::key::catalog_name::CatalogManager;
use common_meta::key::schema_name::SchemaManager;
use tonic::body::BoxBody;
use tonic::codegen::{empty_body, http, BoxFuture, Service};
use tonic::transport::NamedService;

use super::store::kv::KvBackendAdapter;
use crate::metasrv::MetaSrv;

pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
Expand All @@ -53,16 +50,14 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
let router = router.route(
"/catalogs",
meta::CatalogsHandler {
catalog_manager: CatalogManager::new(KvBackendAdapter::wrap(
meta_srv.kv_store().clone(),
)),
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);

let router = router.route(
"/schemas",
meta::SchemasHandler {
schema_manager: SchemaManager::new(KvBackendAdapter::wrap(meta_srv.kv_store().clone())),
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);

Expand Down
18 changes: 12 additions & 6 deletions src/meta-srv/src/service/admin/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
use std::collections::HashMap;

use common_error::ext::BoxedError;
use common_meta::key::catalog_name::CatalogManager;
use common_meta::key::schema_name::SchemaManager;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use futures::TryStreamExt;
Expand All @@ -28,11 +26,11 @@ use crate::error::{Result, TableMetadataManagerSnafu};
use crate::service::admin::HttpHandler;

pub struct CatalogsHandler {
pub catalog_manager: CatalogManager,
pub table_metadata_manager: TableMetadataManagerRef,
}

pub struct SchemasHandler {
pub schema_manager: SchemaManager,
pub table_metadata_manager: TableMetadataManagerRef,
}

pub struct TablesHandler {
Expand All @@ -46,7 +44,11 @@ pub struct TableHandler {
#[async_trait::async_trait]
impl HttpHandler for CatalogsHandler {
async fn handle(&self, _: &str, _: &HashMap<String, String>) -> Result<http::Response<String>> {
let stream = self.catalog_manager.catalog_names().await;
let stream = self
.table_metadata_manager
.catalog_manager()
.catalog_names()
.await;

let keys = stream
.try_collect::<Vec<_>>()
Expand All @@ -70,7 +72,11 @@ impl HttpHandler for SchemasHandler {
.context(error::MissingRequiredParameterSnafu {
param: "catalog_name",
})?;
let stream = self.schema_manager.schema_names(catalog).await;
let stream = self
.table_metadata_manager
.schema_manager()
.schema_names(catalog)
.await;

let keys = stream
.try_collect::<Vec<_>>()
Expand Down
9 changes: 6 additions & 3 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
use common_grpc::channel_manager::ChannelManager;
use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::Builder as RuntimeBuilder;
Expand All @@ -35,7 +36,7 @@ use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metadata_service::{DefaultMetadataService, MetadataService};
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
use meta_srv::mocks::MockInfo;
use meta_srv::service::store::kv::KvStoreRef;
use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef};
use meta_srv::service::store::memory::MemStore;
use servers::grpc::GrpcServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
Expand Down Expand Up @@ -130,8 +131,10 @@ impl GreptimeDbClusterBuilder {

let mock =
meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await;

let metadata_service = DefaultMetadataService::new(mock.meta_srv.kv_store().clone());
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
mock.meta_srv.kv_store().clone(),
)));
let metadata_service = DefaultMetadataService::new(table_metadata_manager);
metadata_service
.create_schema("another_catalog", "another_schema", true)
.await
Expand Down

0 comments on commit 3458a0f

Please sign in to comment.