diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 8641d5d7b2a6..b1bca028ecac 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -117,9 +117,10 @@ impl AlterTableProcedure { let catalog = &alter_expr.catalog_name; let schema = &alter_expr.schema_name; + let alter_kind = self.alter_kind()?; let manager = &self.context.table_metadata_manager; - if let Kind::RenameTable(RenameTable { new_table_name }) = self.alter_kind()? { + if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind { let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name); let exist = manager @@ -146,7 +147,11 @@ impl AlterTableProcedure { } ); - self.data.state = AlterTableState::UpdateMetadata; + if matches!(alter_kind, Kind::RenameTable { .. }) { + self.data.state = AlterTableState::UpdateMetadata; + } else { + self.data.state = AlterTableState::SubmitAlterRegionRequests; + }; Ok(Status::executing(true)) } @@ -174,7 +179,7 @@ impl AlterTableProcedure { }) } - pub async fn submit_alter_region_requests(&self) -> Result { + pub async fn submit_alter_region_requests(&mut self) -> Result { let table_id = self.data.table_id(); let table_ref = self.data.table_ref(); @@ -192,30 +197,31 @@ impl AlterTableProcedure { let mut alter_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { - let datanode_manager = self.context.datanode_manager.clone(); - + let requester = self.context.datanode_manager.datanode(&datanode).await; let regions = find_leader_regions(®ion_routes, &datanode); - alter_region_tasks.push(async move { - for region in regions { - let region_id = RegionId::new(table_id, region); - let request = self.create_alter_region_request(region_id)?; - let request = RegionRequest { - header: Some(RegionRequestHeader { - trace_id: common_telemetry::trace_id().unwrap_or_default(), - ..Default::default() - }), - body: Some(region_request::Body::Alter(request)), - }; - debug!("Submitting {request:?} to {datanode}"); - - let requester = datanode_manager.datanode(&datanode).await; + for region in regions { + let region_id = RegionId::new(table_id, region); + let request = self.create_alter_region_request(region_id)?; + let request = RegionRequest { + header: Some(RegionRequestHeader { + trace_id: common_telemetry::trace_id().unwrap_or_default(), + ..Default::default() + }), + body: Some(region_request::Body::Alter(request)), + }; + debug!("Submitting {request:?} to {datanode}"); + + let datanode = datanode.clone(); + let requester = requester.clone(); + + alter_region_tasks.push(async move { if let Err(e) = requester.handle(request).await { return Err(handle_operate_region_error(datanode)(e)); } - } - Ok(()) - }); + Ok(()) + }); + } } future::join_all(alter_region_tasks) @@ -223,7 +229,9 @@ impl AlterTableProcedure { .into_iter() .collect::>>()?; - Ok(Status::Done) + self.data.state = AlterTableState::UpdateMetadata; + + Ok(Status::executing(true)) } /// Update table metadata for rename table operation. @@ -313,22 +321,17 @@ impl AlterTableProcedure { let alter_kind = self.alter_kind()?; let cache_invalidator = &self.context.cache_invalidator; - let status = if matches!(alter_kind, Kind::RenameTable { .. }) { + if matches!(alter_kind, Kind::RenameTable { .. }) { cache_invalidator .invalidate_table_name(&Context::default(), self.data.table_ref().into()) .await?; - - Status::Done } else { cache_invalidator .invalidate_table_id(&Context::default(), self.data.table_id()) .await?; - - self.data.state = AlterTableState::SubmitAlterRegionRequests; - - Status::executing(true) }; - Ok(status) + + Ok(Status::Done) } fn lock_key_inner(&self) -> Vec { @@ -376,9 +379,9 @@ impl Procedure for AlterTableProcedure { match state { AlterTableState::Prepare => self.on_prepare().await, + AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await, AlterTableState::UpdateMetadata => self.on_update_metadata().await, AlterTableState::InvalidateTableCache => self.on_broadcast().await, - AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await, } .map_err(error_handler) } @@ -398,11 +401,11 @@ impl Procedure for AlterTableProcedure { enum AlterTableState { /// Prepares to alter the table Prepare, + SubmitAlterRegionRequests, /// Updates table metadata. UpdateMetadata, /// Broadcasts the invalidating table cache instruction. InvalidateTableCache, - SubmitAlterRegionRequests, } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index ee955fdb9131..3712ce6ccbbb 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -182,7 +182,7 @@ impl CreateTableProcedure { let mut create_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { - let manager = self.context.datanode_manager.clone(); + let requester = self.context.datanode_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); let requests = regions @@ -197,23 +197,25 @@ impl CreateTableProcedure { }) .collect::>(); - create_region_tasks.push(async move { - for request in requests { - let requester = manager.datanode(&datanode).await; - - let request = RegionRequest { - header: Some(RegionRequestHeader { - trace_id: 0, - span_id: 0, - }), - body: Some(request), - }; + for request in requests { + let request = RegionRequest { + header: Some(RegionRequestHeader { + trace_id: 0, + span_id: 0, + }), + body: Some(request), + }; + + let datanode = datanode.clone(); + let requester = requester.clone(); + + create_region_tasks.push(async move { if let Err(err) = requester.handle(request).await { return Err(handle_operate_region_error(datanode)(err)); } - } - Ok(()) - }); + Ok(()) + }); + } } join_all(create_region_tasks) diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 1de0f7b30636..929416673024 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -142,7 +142,7 @@ impl DropTableProcedure { let mut drop_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { - let clients = self.context.datanode_manager.clone(); + let requester = self.context.datanode_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); let region_ids = regions @@ -150,28 +150,31 @@ impl DropTableProcedure { .map(|region_number| RegionId::new(table_id, *region_number)) .collect::>(); - drop_region_tasks.push(async move { - for region_id in region_ids { - debug!("Dropping region {region_id} on Datanode {datanode:?}"); - - let request = RegionRequest { - header: Some(RegionRequestHeader { - trace_id: 0, - span_id: 0, - }), - body: Some(region_request::Body::Drop(PbDropRegionRequest { - region_id: region_id.as_u64(), - })), - }; - - if let Err(err) = clients.datanode(&datanode).await.handle(request).await { + for region_id in region_ids { + debug!("Dropping region {region_id} on Datanode {datanode:?}"); + + let request = RegionRequest { + header: Some(RegionRequestHeader { + trace_id: 0, + span_id: 0, + }), + body: Some(region_request::Body::Drop(PbDropRegionRequest { + region_id: region_id.as_u64(), + })), + }; + + let datanode = datanode.clone(); + let requester = requester.clone(); + + drop_region_tasks.push(async move { + if let Err(err) = requester.handle(request).await { if err.status_code() != StatusCode::RegionNotFound { return Err(handle_operate_region_error(datanode)(err)); } } - } - Ok(()) - }); + Ok(()) + }); + } } join_all(drop_region_tasks) diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 3979e2876419..eac9dfd16b92 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -362,7 +362,7 @@ async fn test_submit_alter_region_requests() { .await .unwrap(); - let procedure = AlterTableProcedure::new( + let mut procedure = AlterTableProcedure::new( 1, alter_table_task, TableInfoValue::new(table_info), @@ -393,7 +393,7 @@ async fn test_submit_alter_region_requests() { }); let status = procedure.submit_alter_region_requests().await.unwrap(); - assert!(matches!(status, Status::Done)); + assert!(matches!(status, Status::Executing { persist: true })); handle.await.unwrap(); diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 8028d53c60be..4146d43b4a30 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -122,6 +122,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid alter table({}) request: {}", table, err))] + InvalidAlterRequest { + table: String, + location: Location, + err: String, + }, + #[snafu(display("Invalid table state: {}", table_id))] InvalidTable { table_id: TableId, @@ -141,9 +148,9 @@ impl ErrorExt for Error { Error::Datafusion { .. } | Error::SchemaConversion { .. } | Error::TableProjection { .. } => StatusCode::EngineExecuteQuery, - Error::RemoveColumnInIndex { .. } | Error::BuildColumnDescriptor { .. } => { - StatusCode::InvalidArguments - } + Error::RemoveColumnInIndex { .. } + | Error::BuildColumnDescriptor { .. } + | Error::InvalidAlterRequest { .. } => StatusCode::InvalidArguments, Error::TablesRecordBatch { .. } | Error::DuplicatedExecuteCall { .. } => { StatusCode::Unexpected } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 0a0c598c5625..f27d4d1e6145 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -244,6 +244,41 @@ impl TableMeta { let original_primary_key_indices: HashSet<&usize> = self.primary_key_indices.iter().collect(); + let mut names = HashSet::with_capacity(requests.len()); + + for col_to_add in requests { + ensure!( + names.insert(&col_to_add.column_schema.name), + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "add column {} more than once", + col_to_add.column_schema.name + ), + } + ); + + ensure!( + !table_schema.contains_column(&col_to_add.column_schema.name), + error::ColumnExistsSnafu { + table_name, + column_name: col_to_add.column_schema.name.to_string() + }, + ); + + ensure!( + col_to_add.column_schema.is_nullable() + || col_to_add.column_schema.default_constraint().is_some(), + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "no default value for column {}", + col_to_add.column_schema.name + ), + }, + ); + } + let SplitResult { columns_at_first, columns_at_after, @@ -858,6 +893,36 @@ mod tests { assert_eq!(StatusCode::TableColumnExists, err.status_code()); } + #[test] + fn test_add_invalid_column() { + let schema = Arc::new(new_test_schema()); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![0]) + .engine("engine") + .next_column_id(3) + .build() + .unwrap(); + + let alter_kind = AlterKind::AddColumns { + columns: vec![AddColumnRequest { + column_schema: ColumnSchema::new( + "weny", + ConcreteDataType::string_datatype(), + false, + ), + is_key: false, + location: None, + }], + }; + + let err = meta + .builder_with_alter_kind("my_table", &alter_kind) + .err() + .unwrap(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + } + #[test] fn test_remove_unknown_column() { let schema = Arc::new(new_test_schema());