Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: switch to new catalog/schema key #2140

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ use crate::DeregisterTableRequest;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to list catalogs, source: {}", source))]
ListCatalogs {
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to list {}'s schemas, source: {}", catalog, source))]
ListSchemas {
location: Location,
catalog: String,
source: BoxedError,
},

#[snafu(display(
"Failed to re-compile script due to internal error, source: {}",
source
Expand Down Expand Up @@ -284,6 +297,10 @@ impl ErrorExt for Error {
StatusCode::InvalidArguments
}

Error::ListCatalogs { source, .. } | Error::ListSchemas { source, .. } => {
source.status_code()
}

Error::OpenSystemCatalog { source, .. }
| Error::CreateSystemCatalog { source, .. }
| Error::InsertCatalogRecord { source, .. }
Expand Down
39 changes: 11 additions & 28 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use std::sync::Arc;

use async_trait::async_trait;
use common_catalog::consts::MITO_ENGINE;
use common_meta::helper::{CatalogKey, SchemaKey};
use common_meta::ident::TableIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::datanode_table::DatanodeTableValue;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_telemetry::{error, info, warn};
use metrics::increment_gauge;
use snafu::{ensure, OptionExt, ResultExt};
Expand All @@ -45,7 +45,6 @@ use crate::{
/// Catalog manager based on metasrv.
pub struct RemoteCatalogManager {
node_id: u64,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
region_alive_keepers: Arc<RegionAliveKeepers>,
Expand All @@ -57,14 +56,12 @@ impl RemoteCatalogManager {
pub fn new(
engine_manager: TableEngineManagerRef,
node_id: u64,
backend: KvBackendRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
engine_manager,
node_id,
backend,
system_table_requests: Default::default(),
region_alive_keepers,
memory_catalog_manager: Arc::new(MemoryCatalogManager::default()),
Expand Down Expand Up @@ -110,13 +107,6 @@ impl RemoteCatalogManager {
.context(ParallelOpenTableSnafu)?;
Ok(())
}

fn build_schema_key(&self, catalog_name: String, schema_name: String) -> SchemaKey {
SchemaKey {
catalog_name,
schema_name,
}
}
}

async fn open_and_register_table(
Expand Down Expand Up @@ -323,16 +313,12 @@ impl CatalogManager for RemoteCatalogManager {
return Ok(true);
}

let key = self
.build_schema_key(catalog.to_string(), schema.to_string())
.to_string();
let remote_schema_exists = self
.backend
.get(key.as_bytes())
.table_metadata_manager
.schema_manager()
.exist(SchemaNameKey::new(catalog, schema))
.await
.context(TableMetadataManagerSnafu)?
.is_some();

.context(TableMetadataManagerSnafu)?;
// Create schema locally if remote schema exists. Since local schema is managed by memory
// catalog manager, creating a local schema is relatively cheap (just a HashMap).
// Besides, if this method ("schema_exist) is called, it's very likely that someone wants to
Expand Down Expand Up @@ -368,16 +354,13 @@ impl CatalogManager for RemoteCatalogManager {
return Ok(true);
}

let key = CatalogKey {
catalog_name: catalog.to_string(),
};

let key = CatalogNameKey::new(catalog);
let remote_catalog_exists = self
.backend
.get(key.to_string().as_bytes())
.table_metadata_manager
.catalog_manager()
.exist(key)
.await
.context(TableMetadataManagerSnafu)?
.is_some();
.context(TableMetadataManagerSnafu)?;

// Create catalog locally if remote catalog exists. Since local catalog is managed by memory
// catalog manager, creating a local catalog is relatively cheap (just a HashMap).
Expand Down
71 changes: 17 additions & 54 deletions src/catalog/tests/remote_catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use catalog::error::Error;
use catalog::remote::mock::MockTableEngine;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use common_meta::helper::CatalogValue;
use common_meta::ident::TableIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::{CompareAndPutRequest, PutRequest, RangeRequest};
use common_meta::rpc::store::{CompareAndPutRequest, PutRequest};
use datatypes::schema::RawSchema;
use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
use table::engine::{EngineContext, TableEngineRef};
Expand All @@ -52,92 +52,56 @@ mod tests {
}
}

#[tokio::test]
async fn test_backend() {
let backend = MemoryKvBackend::<Error>::default();

let default_catalog_key = CatalogKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
}
.to_string();
let req = PutRequest::new()
.with_key(default_catalog_key.as_bytes())
.with_value(CatalogValue.as_bytes().unwrap());
backend.put(req).await.unwrap();

let schema_key = SchemaKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
}
.to_string();
let req = PutRequest::new()
.with_key(schema_key.as_bytes())
.with_value(SchemaValue.as_bytes().unwrap());
backend.put(req).await.unwrap();

let req = RangeRequest::new().with_prefix(b"__c-".to_vec());
let res = backend
.range(req)
.await
.unwrap()
.kvs
.into_iter()
.map(|kv| String::from_utf8_lossy(kv.key()).to_string());
assert_eq!(
vec!["__c-greptime".to_string()],
res.into_iter().collect::<Vec<_>>()
);
}

#[tokio::test]
async fn test_cached_backend() {
let backend = CachedMetaKvBackend::wrap(Arc::new(MemoryKvBackend::default()));

let default_catalog_key = CatalogKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
}
.to_string();
let default_catalog_key = CatalogNameKey::new(DEFAULT_CATALOG_NAME).to_string();

let req = PutRequest::new()
.with_key(default_catalog_key.as_bytes())
.with_value(CatalogValue.as_bytes().unwrap());
backend.put(req).await.unwrap();

let ret = backend.get(b"__c-greptime").await.unwrap();
let ret = backend.get(b"__catalog_name/greptime").await.unwrap();
let _ = ret.unwrap();

let req = CompareAndPutRequest::new()
.with_key(b"__c-greptime".to_vec())
.with_key(b"__catalog_name/greptime".to_vec())
.with_expect(CatalogValue.as_bytes().unwrap())
.with_value(b"123".to_vec());
let _ = backend.compare_and_put(req).await.unwrap();

let ret = backend.get(b"__c-greptime").await.unwrap();
let ret = backend.get(b"__catalog_name/greptime").await.unwrap();
assert_eq!(b"123", ret.as_ref().unwrap().value.as_slice());

let req = PutRequest::new()
.with_key(b"__c-greptime".to_vec())
.with_key(b"__catalog_name/greptime".to_vec())
.with_value(b"1234".to_vec());
let _ = backend.put(req).await;

let ret = backend.get(b"__c-greptime").await.unwrap();
let ret = backend.get(b"__catalog_name/greptime").await.unwrap();
assert_eq!(b"1234", ret.unwrap().value.as_slice());

backend.delete(b"__c-greptime", false).await.unwrap();
backend
.delete(b"__catalog_name/greptime", false)
.await
.unwrap();

let ret = backend.get(b"__c-greptime").await.unwrap();
let ret = backend.get(b"__catalog_name/greptime").await.unwrap();
assert!(ret.is_none());
}

async fn prepare_components(node_id: u64) -> TestingComponents {
let backend = Arc::new(MemoryKvBackend::default());

let req = PutRequest::new()
.with_key(b"__c-greptime".to_vec())
.with_key(b"__catalog_name/greptime".to_vec())
.with_value(b"".to_vec());
backend.put(req).await.unwrap();

let req = PutRequest::new()
.with_key(b"__s-greptime-public".to_vec())
.with_key(b"__schema_name/greptime-public".to_vec())
.with_value(b"".to_vec());
backend.put(req).await.unwrap();

Expand All @@ -154,7 +118,6 @@ mod tests {
let catalog_manager = RemoteCatalogManager::new(
engine_manager.clone(),
node_id,
cached_backend.clone(),
region_alive_keepers.clone(),
Arc::new(TableMetadataManager::new(cached_backend)),
);
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ mod bench;
mod cmd;
mod helper;
mod repl;
// TODO(weny): Removes it
#[allow(deprecated)]
mod upgrade;

use async_trait::async_trait;
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl TableGlobalValue {
}
}

#[deprecated(since = "0.4.0", note = "Please use the CatalogNameKey instead")]
pub struct CatalogKey {
pub catalog_name: String,
}
Expand Down Expand Up @@ -95,6 +96,7 @@ impl CatalogKey {
#[derive(Debug, Serialize, Deserialize)]
pub struct CatalogValue;

#[deprecated(since = "0.4.0", note = "Please use the SchemaNameKey instead")]
pub struct SchemaKey {
pub catalog_name: String,
pub schema_name: String,
Expand Down
18 changes: 15 additions & 3 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
use table_region::{TableRegionKey, TableRegionManager, TableRegionValue};

use self::catalog_name::CatalogNameValue;
use self::schema_name::SchemaNameValue;
use self::catalog_name::{CatalogManager, CatalogNameValue};
use self::schema_name::{SchemaManager, SchemaNameValue};
use crate::error::{InvalidTableMetadataSnafu, Result, SerdeJsonSnafu};
pub use crate::key::table_route::{TableRouteKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -125,6 +125,8 @@ pub struct TableMetadataManager {
table_info_manager: TableInfoManager,
table_region_manager: TableRegionManager,
datanode_table_manager: DatanodeTableManager,
catalog_manager: CatalogManager,
schema_manager: SchemaManager,
}

impl TableMetadataManager {
Expand All @@ -133,7 +135,9 @@ impl TableMetadataManager {
table_name_manager: TableNameManager::new(kv_backend.clone()),
table_info_manager: TableInfoManager::new(kv_backend.clone()),
table_region_manager: TableRegionManager::new(kv_backend.clone()),
datanode_table_manager: DatanodeTableManager::new(kv_backend),
datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
catalog_manager: CatalogManager::new(kv_backend.clone()),
schema_manager: SchemaManager::new(kv_backend),
}
}

Expand All @@ -152,6 +156,14 @@ impl TableMetadataManager {
pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
&self.datanode_table_manager
}

pub fn catalog_manager(&self) -> &CatalogManager {
&self.catalog_manager
}

pub fn schema_manager(&self) -> &SchemaManager {
&self.schema_manager
}
}

macro_rules! impl_table_meta_key {
Expand Down
31 changes: 31 additions & 0 deletions src/common/meta/src/key/catalog_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::fmt::Display;
use std::sync::Arc;

use common_catalog::consts::DEFAULT_CATALOG_NAME;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
Expand All @@ -32,6 +33,14 @@ pub struct CatalogNameKey<'a> {
pub catalog: &'a str,
}

impl<'a> Default for CatalogNameKey<'a> {
fn default() -> Self {
Self {
catalog: DEFAULT_CATALOG_NAME,
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CatalogNameValue;

Expand Down Expand Up @@ -103,6 +112,12 @@ impl CatalogManager {
Ok(())
}

pub async fn exist(&self, catalog: CatalogNameKey<'_>) -> Result<bool> {
let raw_key = catalog.as_raw_key();

Ok(self.kv_backend.get(&raw_key).await?.is_some())
}

pub async fn catalog_names(&self) -> BoxStream<'static, Result<String>> {
let start_key = CatalogNameKey::range_start_key();
let req = RangeRequest::new().with_prefix(start_key.as_bytes());
Expand All @@ -121,6 +136,7 @@ impl CatalogManager {
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;

#[test]
fn test_serialization() {
Expand All @@ -132,4 +148,19 @@ mod tests {

assert_eq!(key, parsed);
}

#[tokio::test]
async fn test_key_exist() {
let manager = CatalogManager::new(Arc::new(MemoryKvBackend::default()));

let catalog_key = CatalogNameKey::new("my-catalog");

manager.create(catalog_key).await.unwrap();

assert!(manager.exist(catalog_key).await.unwrap());

let wrong_catalog_key = CatalogNameKey::new("my-wrong");

assert!(!manager.exist(wrong_catalog_key).await.unwrap());
}
}
Loading