diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d2cdcf9c4a4d..944f8623e017 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,7 +21,7 @@ use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; -use common_meta::ddl::table_meta::TableMetadataAllocator; +use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::DdlTaskExecutorRef; use common_meta::ddl_manager::DdlManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -419,11 +419,11 @@ impl StartCommand { let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; - let table_meta_allocator = TableMetadataAllocator::new( + let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), table_metadata_manager.table_name_manager().clone(), - ); + )); let ddl_task_executor = Self::create_ddl_task_executor( table_metadata_manager, @@ -458,7 +458,7 @@ impl StartCommand { table_metadata_manager: TableMetadataManagerRef, procedure_manager: ProcedureManagerRef, datanode_manager: DatanodeManagerRef, - table_meta_allocator: TableMetadataAllocator, + table_meta_allocator: TableMetadataAllocatorRef, ) -> Result { let ddl_task_executor: DdlTaskExecutorRef = Arc::new( DdlManager::try_new( diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 8ac4bb22b945..a1326be0b28f 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionNumber, TableId}; +use self::table_meta::TableMetadataAllocatorRef; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; use crate::error::Result; @@ -73,4 +74,5 @@ pub struct DdlContext { pub cache_invalidator: CacheInvalidatorRef, pub table_metadata_manager: TableMetadataManagerRef, pub memory_region_keeper: MemoryRegionKeeperRef, + pub table_metadata_allocator: TableMetadataAllocatorRef, } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index c1c45c30911f..a0a717638ebb 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -34,7 +34,7 @@ use table::table_reference::TableReference; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path}; -use crate::ddl::DdlContext; +use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; use crate::error::{self, Result, TableRouteNotFoundSnafu}; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; @@ -54,16 +54,10 @@ pub struct CreateTableProcedure { impl CreateTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; - pub fn new( - cluster_id: ClusterId, - task: CreateTableTask, - table_route: TableRouteValue, - region_wal_options: HashMap, - context: DdlContext, - ) -> Self { + pub fn new(cluster_id: ClusterId, task: CreateTableTask, context: DdlContext) -> Self { Self { context, - creator: TableCreator::new(cluster_id, task, table_route, region_wal_options), + creator: TableCreator::new(cluster_id, task), } } @@ -75,7 +69,8 @@ impl CreateTableProcedure { opening_regions: vec![], }; - if let TableRouteValue::Physical(x) = &creator.data.table_route { + // Only registers regions if the table route is allocated. + if let Some(TableRouteValue::Physical(x)) = &creator.data.table_route { creator.opening_regions = creator .register_opening_regions(&context, &x.region_routes) .map_err(BoxedError::new) @@ -85,7 +80,7 @@ impl CreateTableProcedure { Ok(CreateTableProcedure { context, creator }) } - pub fn table_info(&self) -> &RawTableInfo { + fn table_info(&self) -> &RawTableInfo { &self.creator.data.task.table_info } @@ -93,11 +88,28 @@ impl CreateTableProcedure { self.table_info().ident.table_id } - pub fn region_wal_options(&self) -> &HashMap { + fn region_wal_options(&self) -> &Option> { &self.creator.data.region_wal_options } - /// Checks whether the table exists. + fn table_route(&self) -> &Option { + &self.creator.data.table_route + } + + #[cfg(any(test, feature = "testing"))] + pub fn set_allocated_metadata( + &mut self, + table_id: TableId, + table_route: TableRouteValue, + region_wal_options: HashMap, + ) { + self.creator + .set_allocated_metadata(table_id, table_route, region_wal_options) + } + + /// On the prepare step, it performs: + /// - Checks whether the table exists. + /// - Allocates the table id. async fn on_prepare(&mut self) -> Result { let expr = &self.creator.data.task.create_table; let table_name_value = self @@ -124,6 +136,22 @@ impl CreateTableProcedure { } self.creator.data.state = CreateTableState::DatanodeCreateRegions; + let TableMetadata { + table_id, + table_route, + region_wal_options, + } = self + .context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { + cluster_id: self.creator.data.cluster_id, + }, + &self.creator.data.task, + ) + .await?; + self.creator + .set_allocated_metadata(table_id, table_route, region_wal_options); Ok(Status::executing(true)) } @@ -138,7 +166,8 @@ impl CreateTableProcedure { } pub async fn on_datanode_create_regions(&mut self) -> Result { - match &self.creator.data.table_route { + // Safety: the table route must be allocated. + match &self.creator.data.table_route.clone().unwrap() { TableRouteValue::Physical(x) => { let region_routes = x.region_routes.clone(); let request_builder = self.new_region_request_builder(None)?; @@ -170,7 +199,8 @@ impl CreateTableProcedure { region_routes: &[RegionRoute], request_builder: CreateRequestBuilder, ) -> Result { - if self.creator.data.table_route.is_physical() { + // Safety: the table_route must be allocated. + if self.table_route().as_ref().unwrap().is_physical() { // Registers opening regions let guards = self .creator @@ -181,13 +211,12 @@ impl CreateTableProcedure { } let create_table_data = &self.creator.data; - let region_wal_options = &create_table_data.region_wal_options; - + // Safety: the region_wal_options must be allocated + let region_wal_options = self.region_wal_options().as_ref().unwrap(); let create_table_expr = &create_table_data.task.create_table; let catalog = &create_table_expr.catalog_name; let schema = &create_table_expr.schema_name; let storage_path = region_storage_path(catalog, schema); - let leaders = find_leaders(region_routes); let mut create_region_tasks = Vec::with_capacity(leaders.len()); @@ -203,7 +232,6 @@ impl CreateTableProcedure { storage_path.clone(), region_wal_options, )?; - requests.push(PbRegionRequest::Create(create_region_request)); } @@ -218,7 +246,6 @@ impl CreateTableProcedure { let datanode = datanode.clone(); let requester = requester.clone(); - create_region_tasks.push(async move { if let Err(err) = requester.handle(request).await { return Err(handle_operate_region_error(datanode)(err)); @@ -245,13 +272,12 @@ impl CreateTableProcedure { let manager = &self.context.table_metadata_manager; let raw_table_info = self.table_info().clone(); - let region_wal_options = self.region_wal_options().clone(); + // Safety: the region_wal_options must be allocated. + let region_wal_options = self.region_wal_options().clone().unwrap(); + // Safety: the table_route must be allocated. + let table_route = self.table_route().clone().unwrap(); manager - .create_table_metadata( - raw_table_info, - self.creator.data.table_route.clone(), - region_wal_options, - ) + .create_table_metadata(raw_table_info, table_route, region_wal_options) .await?; info!("Created table metadata for table {table_id}"); @@ -303,19 +329,14 @@ pub struct TableCreator { } impl TableCreator { - pub fn new( - cluster_id: u64, - task: CreateTableTask, - table_route: TableRouteValue, - region_wal_options: HashMap, - ) -> Self { + pub fn new(cluster_id: u64, task: CreateTableTask) -> Self { Self { data: CreateTableData { state: CreateTableState::Prepare, cluster_id, task, - table_route, - region_wal_options, + table_route: None, + region_wal_options: None, }, opening_regions: vec![], } @@ -347,6 +368,17 @@ impl TableCreator { } Ok(opening_region_guards) } + + fn set_allocated_metadata( + &mut self, + table_id: TableId, + table_route: TableRouteValue, + region_wal_options: HashMap, + ) { + self.data.task.table_info.ident.table_id = table_id; + self.data.table_route = Some(table_route); + self.data.region_wal_options = Some(region_wal_options); + } } #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] @@ -363,8 +395,10 @@ pub enum CreateTableState { pub struct CreateTableData { pub state: CreateTableState, pub task: CreateTableTask, - table_route: TableRouteValue, - pub region_wal_options: HashMap, + /// None stands for not allocated yet. + table_route: Option, + /// None stands for not allocated yet. + pub region_wal_options: Option>, pub cluster_id: ClusterId, } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index f1bc5237f7db..efad73dae6a3 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, tracing}; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::{RegionNumber, TableId}; +use store_api::storage::TableId; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; @@ -27,12 +26,9 @@ use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; -use crate::ddl::table_meta::TableMetadataAllocator; +use crate::ddl::table_meta::TableMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; -use crate::ddl::{ - utils, DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, - TableMetadataAllocatorContext, -}; +use crate::ddl::{utils, DdlContext, DdlTaskExecutor, ExecutorContext}; use crate::error::{ self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu, @@ -62,7 +58,7 @@ pub struct DdlManager { datanode_manager: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocator, + table_metadata_allocator: TableMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, } @@ -73,7 +69,7 @@ impl DdlManager { datanode_clients: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocator, + table_metadata_allocator: TableMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, ) -> Result { let manager = Self { @@ -100,6 +96,7 @@ impl DdlManager { cache_invalidator: self.cache_invalidator.clone(), table_metadata_manager: self.table_metadata_manager.clone(), memory_region_keeper: self.memory_region_keeper.clone(), + table_metadata_allocator: self.table_metadata_allocator.clone(), } } @@ -205,18 +202,10 @@ impl DdlManager { &self, cluster_id: ClusterId, create_table_task: CreateTableTask, - table_route: TableRouteValue, - region_wal_options: HashMap, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); - let procedure = CreateTableProcedure::new( - cluster_id, - create_table_task, - table_route, - region_wal_options, - context, - ); + let procedure = CreateTableProcedure::new(cluster_id, create_table_task, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -470,31 +459,10 @@ async fn handle_drop_table_task( async fn handle_create_table_task( ddl_manager: &DdlManager, cluster_id: ClusterId, - mut create_table_task: CreateTableTask, + create_table_task: CreateTableTask, ) -> Result { - let table_meta = ddl_manager - .table_metadata_allocator - .create( - &TableMetadataAllocatorContext { cluster_id }, - &create_table_task, - ) - .await?; - - let TableMetadata { - table_id, - table_route, - region_wal_options, - } = table_meta; - - create_table_task.table_info.ident.table_id = table_id; - let (id, output) = ddl_manager - .submit_create_table_task( - cluster_id, - create_table_task, - table_route, - region_wal_options, - ) + .submit_create_table_task(cluster_id, create_table_task) .await?; let procedure_id = id.to_string(); @@ -645,11 +613,11 @@ mod tests { Arc::new(DummyDatanodeManager), Arc::new(DummyCacheInvalidator), table_metadata_manager.clone(), - TableMetadataAllocator::new( + Arc::new(TableMetadataAllocator::new( Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), Arc::new(WalOptionsAllocator::default()), table_metadata_manager.table_name_manager().clone(), - ), + )), Arc::new(MemoryRegionKeeper::default()), ); diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index bd9e11d1abe9..0a38bc3f37be 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -21,7 +21,7 @@ use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc::channel_manager::ChannelConfig; use common_meta::datanode_manager::DatanodeManagerRef; -use common_meta::ddl::table_meta::TableMetadataAllocator; +use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -211,7 +211,7 @@ impl MetaSrvBuilder { options.wal.clone(), kv_backend.clone(), )); - let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { + let table_metadata_allocator = Arc::new(table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) @@ -228,7 +228,7 @@ impl MetaSrvBuilder { table_metadata_manager.table_name_manager().clone(), peer_allocator, ) - }); + })); let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); @@ -238,7 +238,7 @@ impl MetaSrvBuilder { &procedure_manager, &mailbox, &table_metadata_manager, - table_metadata_allocator, + &table_metadata_allocator, &opening_region_keeper, )?; @@ -386,7 +386,7 @@ fn build_ddl_manager( procedure_manager: &ProcedureManagerRef, mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocator, + table_metadata_allocator: &TableMetadataAllocatorRef, memory_region_keeper: &MemoryRegionKeeperRef, ) -> Result { let datanode_clients = datanode_clients.unwrap_or_else(|| { @@ -413,7 +413,7 @@ fn build_ddl_manager( datanode_clients, cache_invalidator, table_metadata_manager.clone(), - table_metadata_allocator, + table_metadata_allocator.clone(), memory_region_keeper.clone(), ) .context(error::InitDdlManagerSnafu)?, diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 827c8e866271..351dd9547b41 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -98,12 +98,16 @@ fn create_table_task(table_name: Option<&str>) -> CreateTableTask { #[test] fn test_region_request_builder() { - let procedure = CreateTableProcedure::new( + let mut procedure = CreateTableProcedure::new( 1, create_table_task(None), + test_data::new_ddl_context(Arc::new(DatanodeClients::default())), + ); + + procedure.set_allocated_metadata( + 1024, TableRouteValue::physical(test_data::new_region_routes()), HashMap::default(), - test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ); let template = procedure.new_region_request_builder(None).unwrap(); @@ -192,11 +196,15 @@ async fn test_on_datanode_create_regions() { let mut procedure = CreateTableProcedure::new( 1, create_table_task(None), - TableRouteValue::physical(region_routes), - HashMap::default(), test_data::new_ddl_context(datanode_manager), ); + procedure.set_allocated_metadata( + 42, + TableRouteValue::physical(test_data::new_region_routes()), + HashMap::default(), + ); + let expected_created_regions = Arc::new(Mutex::new(HashSet::from([ RegionId::new(42, 1), RegionId::new(42, 2), diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index b8565c8dc742..0a9ace054c45 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -105,6 +105,7 @@ pub mod test_data { use chrono::DateTime; use common_catalog::consts::MITO2_ENGINE; use common_meta::datanode_manager::DatanodeManagerRef; + use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::DdlContext; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -112,6 +113,7 @@ pub mod test_data { use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::SequenceBuilder; + use common_meta::wal_options_allocator::WalOptionsAllocator; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; @@ -192,6 +194,7 @@ pub mod test_data { SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); DdlContext { datanode_manager, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( @@ -200,8 +203,13 @@ pub mod test_data { server_addr: "127.0.0.1:4321".to_string(), }, )), - table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), + table_metadata_manager: table_metadata_manager.clone(), memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), + table_metadata_allocator: Arc::new(TableMetadataAllocator::new( + Arc::new(SequenceBuilder::new("test", kv_backend).build()), + Arc::new(WalOptionsAllocator::default()), + table_metadata_manager.table_name_manager().clone(), + )), } } } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 155c2470669a..b0749c3a23f3 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -136,11 +136,11 @@ impl GreptimeDbStandaloneBuilder { mix_options.wal_meta.clone(), kv_backend.clone(), )); - let table_meta_allocator = TableMetadataAllocator::new( + let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), table_metadata_manager.table_name_manager().clone(), - ); + )); let ddl_task_executor = Arc::new( DdlManager::try_new(