Skip to content

Commit

Permalink
refactor: allocate table ids in the procedure (#3293)
Browse files Browse the repository at this point in the history
* refactor: refactor the create logical tables

* test(create_logical_tables): add tests for on_prepare

* test(create_logical_tables): add tests for on_create_metadata

* refactor: rename to create_logical_tables_metadata

* chore: fmt toml

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Feb 22, 2024
1 parent f26505b commit 1dc4fec
Show file tree
Hide file tree
Showing 7 changed files with 616 additions and 80 deletions.
157 changes: 82 additions & 75 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ impl CreateLogicalTablesProcedure {
Ok(Self { context, creator })
}

async fn on_prepare(&mut self) -> Result<Status> {
/// On the prepares step, it performs:
/// - Checks whether physical table exists.
/// - Checks whether logical tables exist.
/// - Allocates the table ids.
///
/// Abort(non-retry):
/// - The physical table does not exist.
/// - Failed to check whether tables exist.
/// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;

// Sets physical region numbers
Expand All @@ -80,7 +89,7 @@ impl CreateLogicalTablesProcedure {
.data
.set_physical_region_numbers(physical_region_numbers);

// Checks if the tables exists
// Checks if the tables exist
let table_name_keys = self
.creator
.data
Expand All @@ -96,24 +105,9 @@ impl CreateLogicalTablesProcedure {
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();

// Sets table ids already exists
self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);

// If all tables do not exists, we can create them directly.
if self.creator.data.is_all_tables_not_exists() {
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
return Ok(Status::executing(true));
}

// Filter out the tables that already exist.
let tasks = &self.creator.data.tasks;
let mut filtered_tasks = Vec::with_capacity(tasks.len());
for (task, table_id) in tasks
.iter()
.zip(self.creator.data.table_ids_already_exists().iter())
{
// Validates the tasks
let tasks = &mut self.creator.data.tasks;
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
if table_id.is_some() {
// If a table already exists, we just ignore it.
ensure!(
Expand All @@ -124,17 +118,34 @@ impl CreateLogicalTablesProcedure {
);
continue;
}
filtered_tasks.push(task.clone());
}

// Resets tasks
self.creator.data.tasks = filtered_tasks;
if self.creator.data.tasks.is_empty() {
// If all tables already exist, we can skip the `DatanodeCreateRegions` stage.
self.creator.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(true));
// If all tables already exist, returns the table_ids.
if already_exists_tables_ids.iter().all(Option::is_some) {
return Ok(Status::done_with_output(
already_exists_tables_ids
.into_iter()
.flatten()
.collect::<Vec<_>>(),
));
}

// Allocates table ids
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
let table_id = if let Some(table_id) = table_id {
*table_id
} else {
self.context
.table_metadata_allocator
.allocate_table_id(task)
.await?
};
task.set_table_id(table_id);
}

self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
Ok(Status::executing(true))
}
Expand All @@ -152,17 +163,20 @@ impl CreateLogicalTablesProcedure {
self.create_regions(region_routes).await
}

/// Creates table metadata
///
/// Abort(not-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(&self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;

let physical_table_id = self.creator.data.physical_table_id();
let tables_data = self.creator.data.all_tables_data();
let num_tables = tables_data.len();
let remaining_tasks = self.creator.data.remaining_tasks();
let num_tables = remaining_tasks.len();

if num_tables > 0 {
let chunk_size = manager.max_logical_tables_per_batch();
if num_tables > chunk_size {
let chunks = tables_data
let chunks = remaining_tasks
.into_iter()
.chunks(chunk_size)
.into_iter()
Expand All @@ -172,11 +186,21 @@ impl CreateLogicalTablesProcedure {
manager.create_logical_tables_metadata(chunk).await?;
}
} else {
manager.create_logical_tables_metadata(tables_data).await?;
manager
.create_logical_tables_metadata(remaining_tasks)
.await?;
}
}

let table_ids = self.creator.data.real_table_ids();
// The `table_id` MUST be collected after the [Prepare::Prepare],
// ensures the all `table_id`s have been allocated.
let table_ids = self
.creator
.data
.tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();

info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");

Expand Down Expand Up @@ -238,10 +262,10 @@ impl CreateLogicalTablesProcedure {
body: Some(PbRegionRequest::Creates(creates)),
};
create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(add_peer_context_if_needed(datanode)(err));
}
Ok(())
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}

Expand Down Expand Up @@ -310,17 +334,13 @@ impl TablesCreator {
tasks: Vec<CreateTableTask>,
physical_table_id: TableId,
) -> Self {
let table_ids_from_tasks = tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
let len = table_ids_from_tasks.len();
let len = tasks.len();

Self {
data: CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_from_tasks,
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
Expand All @@ -334,10 +354,6 @@ pub struct CreateTablesData {
cluster_id: ClusterId,
state: CreateTablesState,
tasks: Vec<CreateTableTask>,
table_ids_from_tasks: Vec<TableId>,
// Because the table_id is allocated before entering the distributed lock,
// it needs to recheck if the table exists when creating a table.
// If it does exist, then the table_id needs to be replaced with the existing one.
table_ids_already_exists: Vec<Option<TableId>>,
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
Expand All @@ -360,43 +376,34 @@ impl CreateTablesData {
self.table_ids_already_exists = table_ids_already_exists;
}

fn table_ids_already_exists(&self) -> &[Option<TableId>] {
&self.table_ids_already_exists
}

fn is_all_tables_not_exists(&self) -> bool {
self.table_ids_already_exists.iter().all(Option::is_none)
}

pub fn real_table_ids(&self) -> Vec<TableId> {
self.table_ids_from_tasks
.iter()
.zip(self.table_ids_already_exists.iter())
.map(|(table_id_from_task, table_id_already_exists)| {
table_id_already_exists.unwrap_or(*table_id_from_task)
})
.collect::<Vec<_>>()
}

fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
self.tasks
.iter()
.map(|task| &task.create_table)
.collect::<Vec<_>>()
}

fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
/// Returns the remaining tasks.
/// The length of tasks must be greater than 0.
fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
self.tasks
.iter()
.map(|task| {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| RegionId::new(table_info.ident.table_id, *region_number))
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
(table_info, table_route)
.zip(self.table_ids_already_exists.iter())
.flat_map(|(task, table_id)| {
if table_id.is_none() {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| {
RegionId::new(table_info.ident.table_id, *region_number)
})
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
Some((table_info, table_route))
} else {
None
}
})
.collect::<Vec<_>>()
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TableMetadataAllocator {
}
}

async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
let table_id = if let Some(table_id) = &task.create_table.table_id {
let table_id = table_id.id;

Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod create_logical_tables;
mod create_table;
Loading

0 comments on commit 1dc4fec

Please sign in to comment.