Skip to content

Commit

Permalink
refactor(create_logical_tables): allocate table ids in the procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Feb 7, 2024
1 parent b7ace2c commit d3b03e5
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 56 deletions.
64 changes: 24 additions & 40 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,18 @@ impl CreateLogicalTablesProcedure {
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();

// Sets table ids already exists
self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);

// If all tables do not exists, we can create them directly.
if self.creator.data.is_all_tables_not_exists() {
if already_exists_tables_ids.iter().all(Option::is_none) {
// Allocates table ids.
self.allocate_table_ids().await?;
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
return Ok(Status::executing(true));
}

// Filter out the tables that already exist.
// Filters out the tables that already exist.
let tasks = &self.creator.data.tasks;
let mut filtered_tasks = Vec::with_capacity(tasks.len());
for (task, table_id) in tasks
.iter()
.zip(self.creator.data.table_ids_already_exists().iter())
{
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
if table_id.is_some() {
// If a table already exists, we just ignore it.
ensure!(
Expand All @@ -134,10 +128,27 @@ impl CreateLogicalTablesProcedure {
return Ok(Status::executing(true));
}

// Allocates table ids.
self.allocate_table_ids().await?;
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
Ok(Status::executing(true))
}

// Allocates table ids.
async fn allocate_table_ids(&mut self) -> Result<()> {
for task in self.creator.data.tasks.iter_mut() {
let table_id = self
.context
.table_metadata_allocator
.allocate_table_id(task)
.await?;
// Sets allocates table id.
task.table_info.ident.table_id = table_id;
}

Ok(())
}

pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let physical_table_id = self.creator.data.physical_table_id();
let (_, physical_table_route) = self
Expand Down Expand Up @@ -296,18 +307,11 @@ impl TablesCreator {
tasks: Vec<CreateTableTask>,
physical_table_id: TableId,
) -> Self {
let table_ids_from_tasks = tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
let len = table_ids_from_tasks.len();
Self {
data: CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_from_tasks,
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
},
Expand All @@ -320,11 +324,6 @@ pub struct CreateTablesData {
cluster_id: ClusterId,
state: CreateTablesState,
tasks: Vec<CreateTableTask>,
table_ids_from_tasks: Vec<TableId>,
// Because the table_id is allocated before entering the distributed lock,
// it needs to recheck if the table exists when creating a table.
// If it does exist, then the table_id needs to be replaced with the existing one.
table_ids_already_exists: Vec<Option<TableId>>,
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
}
Expand All @@ -342,25 +341,10 @@ impl CreateTablesData {
self.physical_region_numbers = physical_region_numbers;
}

fn set_table_ids_already_exists(&mut self, table_ids_already_exists: Vec<Option<TableId>>) {
self.table_ids_already_exists = table_ids_already_exists;
}

fn table_ids_already_exists(&self) -> &[Option<TableId>] {
&self.table_ids_already_exists
}

fn is_all_tables_not_exists(&self) -> bool {
self.table_ids_already_exists.iter().all(Option::is_none)
}

pub fn real_table_ids(&self) -> Vec<TableId> {
self.table_ids_from_tasks
self.tasks
.iter()
.zip(self.table_ids_already_exists.iter())
.map(|(table_id_from_task, table_id_already_exists)| {
table_id_already_exists.unwrap_or(*table_id_from_task)
})
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>()
}

Expand Down
13 changes: 3 additions & 10 deletions src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl TableMetadataAllocator {
}
}

async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
/// Allocates table id if needed; it has no effect if the table_id is already set.
pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
let table_id = if let Some(table_id) = &task.create_table.table_id {
let table_id = table_id.id;

Expand Down Expand Up @@ -177,6 +178,7 @@ impl TableMetadataAllocator {
Ok(table_route)
}

/// Allocates [TableMetadata].
pub async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
Expand All @@ -197,15 +199,6 @@ impl TableMetadataAllocator {
region_wal_options,
})
}

/// Sets table ids with all tasks.
pub async fn set_table_ids_on_logic_create(&self, tasks: &mut [CreateTableTask]) -> Result<()> {
for task in tasks {
let table_id = self.allocate_table_id(task).await?;
task.table_info.ident.table_id = table_id;
}
Ok(())
}
}

pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
Expand Down
7 changes: 1 addition & 6 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,19 +486,14 @@ async fn handle_create_table_task(
async fn handle_create_logical_table_tasks(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
mut create_table_tasks: Vec<CreateTableTask>,
create_table_tasks: Vec<CreateTableTask>,
) -> Result<SubmitDdlTaskResponse> {
ensure!(!create_table_tasks.is_empty(), EmptyCreateTableTasksSnafu);
let physical_table_id = utils::check_and_get_physical_table_id(
&ddl_manager.table_metadata_manager,
&create_table_tasks,
)
.await?;
// Sets table_ids on create_table_tasks
ddl_manager
.table_metadata_allocator
.set_table_ids_on_logic_create(&mut create_table_tasks)
.await?;
let num_logical_tables = create_table_tasks.len();

let (id, output) = ddl_manager
Expand Down

0 comments on commit d3b03e5

Please sign in to comment.