Skip to content

Commit

Permalink
refactor: allocate table id in the procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Feb 7, 2024
1 parent 1e2dad5 commit b7ace2c
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 96 deletions.
8 changes: 4 additions & 4 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
Expand Down Expand Up @@ -419,11 +419,11 @@ impl StartCommand {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

let table_meta_allocator = TableMetadataAllocator::new(
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_metadata_manager.table_name_manager().clone(),
);
));

let ddl_task_executor = Self::create_ddl_task_executor(
table_metadata_manager,
Expand Down Expand Up @@ -458,7 +458,7 @@ impl StartCommand {
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocator,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<DdlTaskExecutorRef> {
let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
DdlManager::try_new(
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};

use self::table_meta::TableMetadataAllocatorRef;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
Expand Down Expand Up @@ -73,4 +74,5 @@ pub struct DdlContext {
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub memory_region_keeper: MemoryRegionKeeperRef,
pub table_metadata_allocator: TableMetadataAllocatorRef,
}
106 changes: 70 additions & 36 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use table::table_reference::TableReference;

use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
Expand All @@ -54,16 +54,10 @@ pub struct CreateTableProcedure {
impl CreateTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";

pub fn new(
cluster_id: ClusterId,
task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
pub fn new(cluster_id: ClusterId, task: CreateTableTask, context: DdlContext) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, table_route, region_wal_options),
creator: TableCreator::new(cluster_id, task),
}
}

Expand All @@ -75,7 +69,8 @@ impl CreateTableProcedure {
opening_regions: vec![],
};

if let TableRouteValue::Physical(x) = &creator.data.table_route {
// Only registers regions if the table route is allocated.
if let Some(TableRouteValue::Physical(x)) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
Expand All @@ -85,19 +80,36 @@ impl CreateTableProcedure {
Ok(CreateTableProcedure { context, creator })
}

pub fn table_info(&self) -> &RawTableInfo {
fn table_info(&self) -> &RawTableInfo {
&self.creator.data.task.table_info
}

fn table_id(&self) -> TableId {
self.table_info().ident.table_id
}

pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
fn region_wal_options(&self) -> &Option<HashMap<RegionNumber, String>> {
&self.creator.data.region_wal_options
}

/// Checks whether the table exists.
fn table_route(&self) -> &Option<TableRouteValue> {
&self.creator.data.table_route
}

#[cfg(any(test, feature = "testing"))]
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options)
}

/// On the prepare step, it performs:
/// - Checks whether the table exists.
/// - Allocates the table id.
async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
let table_name_value = self
Expand All @@ -124,6 +136,22 @@ impl CreateTableProcedure {
}

self.creator.data.state = CreateTableState::DatanodeCreateRegions;
let TableMetadata {
table_id,
table_route,
region_wal_options,
} = self
.context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext {
cluster_id: self.creator.data.cluster_id,
},
&self.creator.data.task,
)
.await?;
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options);

Ok(Status::executing(true))
}
Expand All @@ -138,7 +166,8 @@ impl CreateTableProcedure {
}

pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
match &self.creator.data.table_route {
// Safety: the table route must be allocated.
match &self.creator.data.table_route.clone().unwrap() {
TableRouteValue::Physical(x) => {
let region_routes = x.region_routes.clone();
let request_builder = self.new_region_request_builder(None)?;
Expand Down Expand Up @@ -170,7 +199,8 @@ impl CreateTableProcedure {
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
if self.creator.data.table_route.is_physical() {
// Safety: the table_route must be allocated.
if self.table_route().as_ref().unwrap().is_physical() {
// Registers opening regions
let guards = self
.creator
Expand All @@ -181,13 +211,12 @@ impl CreateTableProcedure {
}

let create_table_data = &self.creator.data;
let region_wal_options = &create_table_data.region_wal_options;

// Safety: the region_wal_options must be allocated
let region_wal_options = self.region_wal_options().as_ref().unwrap();
let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);

let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());

Expand All @@ -203,7 +232,6 @@ impl CreateTableProcedure {
storage_path.clone(),
region_wal_options,
)?;

requests.push(PbRegionRequest::Create(create_region_request));
}

Expand All @@ -218,7 +246,6 @@ impl CreateTableProcedure {

let datanode = datanode.clone();
let requester = requester.clone();

create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
Expand All @@ -245,13 +272,12 @@ impl CreateTableProcedure {
let manager = &self.context.table_metadata_manager;

let raw_table_info = self.table_info().clone();
let region_wal_options = self.region_wal_options().clone();
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options().clone().unwrap();
// Safety: the table_route must be allocated.
let table_route = self.table_route().clone().unwrap();
manager
.create_table_metadata(
raw_table_info,
self.creator.data.table_route.clone(),
region_wal_options,
)
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
info!("Created table metadata for table {table_id}");

Expand Down Expand Up @@ -303,19 +329,14 @@ pub struct TableCreator {
}

impl TableCreator {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
pub fn new(cluster_id: u64, task: CreateTableTask) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
table_route,
region_wal_options,
table_route: None,
region_wal_options: None,
},
opening_regions: vec![],
}
Expand Down Expand Up @@ -347,6 +368,17 @@ impl TableCreator {
}
Ok(opening_region_guards)
}

fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.data.task.table_info.ident.table_id = table_id;
self.data.table_route = Some(table_route);
self.data.region_wal_options = Some(region_wal_options);
}
}

#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
Expand All @@ -363,8 +395,10 @@ pub enum CreateTableState {
pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
table_route: TableRouteValue,
pub region_wal_options: HashMap<RegionNumber, String>,
/// None stands for not allocated yet.
table_route: Option<TableRouteValue>,
/// None stands for not allocated yet.
pub region_wal_options: Option<HashMap<RegionNumber, String>>,
pub cluster_id: ClusterId,
}

Expand Down
54 changes: 11 additions & 43 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionNumber, TableId};
use store_api::storage::TableId;

use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{
utils, DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata,
TableMetadataAllocatorContext,
};
use crate::ddl::{utils, DdlContext, DdlTaskExecutor, ExecutorContext};
use crate::error::{
self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu,
Expand Down Expand Up @@ -62,7 +58,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocator,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
}

Expand All @@ -73,7 +69,7 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocator,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
) -> Result<Self> {
let manager = Self {
Expand All @@ -100,6 +96,7 @@ impl DdlManager {
cache_invalidator: self.cache_invalidator.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
memory_region_keeper: self.memory_region_keeper.clone(),
table_metadata_allocator: self.table_metadata_allocator.clone(),
}
}

Expand Down Expand Up @@ -205,18 +202,10 @@ impl DdlManager {
&self,
cluster_id: ClusterId,
create_table_task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();

let procedure = CreateTableProcedure::new(
cluster_id,
create_table_task,
table_route,
region_wal_options,
context,
);
let procedure = CreateTableProcedure::new(cluster_id, create_table_task, context);

let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

Expand Down Expand Up @@ -470,31 +459,10 @@ async fn handle_drop_table_task(
async fn handle_create_table_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
mut create_table_task: CreateTableTask,
create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_meta = ddl_manager
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_table_task,
)
.await?;

let TableMetadata {
table_id,
table_route,
region_wal_options,
} = table_meta;

create_table_task.table_info.ident.table_id = table_id;

let (id, output) = ddl_manager
.submit_create_table_task(
cluster_id,
create_table_task,
table_route,
region_wal_options,
)
.submit_create_table_task(cluster_id, create_table_task)
.await?;

let procedure_id = id.to_string();
Expand Down Expand Up @@ -645,11 +613,11 @@ mod tests {
Arc::new(DummyDatanodeManager),
Arc::new(DummyCacheInvalidator),
table_metadata_manager.clone(),
TableMetadataAllocator::new(
Arc::new(TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
table_metadata_manager.table_name_manager().clone(),
),
)),
Arc::new(MemoryRegionKeeper::default()),
);

Expand Down
Loading

0 comments on commit b7ace2c

Please sign in to comment.