diff --git a/Cargo.lock b/Cargo.lock index a3214306b920..c9cbd2056adc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1785,8 +1785,10 @@ dependencies = [ "arc-swap", "async-trait", "chrono-tz 0.6.3", + "common-catalog", "common-error", "common-macro", + "common-meta", "common-query", "common-runtime", "common-telemetry", @@ -1801,6 +1803,7 @@ dependencies = [ "paste", "ron", "serde", + "serde_json", "session", "snafu", "statrs", diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 11cb3df96b71..cf0008ad802b 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -164,11 +164,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to find table partitions: #{table}"))] - FindPartitions { - source: partition::error::Error, - table: String, - }, + #[snafu(display("Failed to find table partitions"))] + FindPartitions { source: partition::error::Error }, #[snafu(display("Failed to find region routes"))] FindRegionRoutes { source: partition::error::Error }, diff --git a/src/catalog/src/information_schema/partitions.rs b/src/catalog/src/information_schema/partitions.rs index ecf23f8cc9ce..e7b80e2342a1 100644 --- a/src/catalog/src/information_schema/partitions.rs +++ b/src/catalog/src/information_schema/partitions.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::pin::pin; use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; @@ -31,7 +32,7 @@ use datatypes::vectors::{ ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder, MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder, }; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use partition::manager::PartitionInfo; use partition::partition::PartitionDef; use snafu::{OptionExt, ResultExt}; @@ -240,40 +241,64 @@ impl InformationSchemaPartitionsBuilder { let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { - let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; - - while let Some(table) = stream.try_next().await? { - let table_info = table.table_info(); - - if table_info.table_type == TableType::Temporary { - continue; - } - - let table_id = table_info.ident.table_id; - let partitions = if let Some(partition_manager) = &partition_manager { + let table_info_stream = catalog_manager + .tables(&catalog_name, &schema_name) + .await + .try_filter_map(|t| async move { + let table_info = t.table_info(); + if table_info.table_type == TableType::Temporary { + Ok(None) + } else { + Ok(Some(table_info)) + } + }); + + const BATCH_SIZE: usize = 128; + + // Split table infos into chunks + let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE)); + + while let Some(table_infos) = table_info_chunks.next().await { + let table_infos = table_infos.into_iter().collect::>>()?; + let table_ids: Vec = + table_infos.iter().map(|info| info.ident.table_id).collect(); + + let mut table_partitions = if let Some(partition_manager) = &partition_manager { partition_manager - .find_table_partitions(table_id) + .batch_find_table_partitions(&table_ids) .await - .context(FindPartitionsSnafu { - table: &table_info.name, - })? + .context(FindPartitionsSnafu)? } else { // Current node must be a standalone instance, contains only one partition by default. // TODO(dennis): change it when we support multi-regions for standalone. - vec![PartitionInfo { - id: RegionId::new(table_id, 0), - partition: PartitionDef::new(vec![], vec![]), - }] + table_ids + .into_iter() + .map(|table_id| { + ( + table_id, + vec![PartitionInfo { + id: RegionId::new(table_id, 0), + partition: PartitionDef::new(vec![], vec![]), + }], + ) + }) + .collect() }; - self.add_partitions( - &predicates, - &table_info, - &catalog_name, - &schema_name, - &table_info.name, - &partitions, - ); + for table_info in table_infos { + let partitions = table_partitions + .remove(&table_info.ident.table_id) + .unwrap_or(vec![]); + + self.add_partitions( + &predicates, + &table_info, + &catalog_name, + &schema_name, + &table_info.name, + &partitions, + ); + } } } diff --git a/src/catalog/src/information_schema/region_peers.rs b/src/catalog/src/information_schema/region_peers.rs index 882ad263092c..9a436ab7f7b7 100644 --- a/src/catalog/src/information_schema/region_peers.rs +++ b/src/catalog/src/information_schema/region_peers.rs @@ -199,7 +199,7 @@ impl InformationSchemaRegionPeersBuilder { let table_routes = if let Some(partition_manager) = &partition_manager { partition_manager - .find_region_routes_batch(&table_ids) + .batch_find_region_routes(&table_ids) .await .context(FindRegionRoutesSnafu)? } else { diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 6eba5059512b..a6c581122459 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -260,6 +260,7 @@ async fn create_query_engine(meta_addr: &str) -> Result { catalog_list, None, None, + None, false, plugins.clone(), )); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 944f8623e017..edd262e9c29a 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -22,7 +22,7 @@ use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::ddl::ProcedureExecutorRef; use common_meta::ddl_manager::DdlManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; @@ -459,8 +459,8 @@ impl StartCommand { procedure_manager: ProcedureManagerRef, datanode_manager: DatanodeManagerRef, table_meta_allocator: TableMetadataAllocatorRef, - ) -> Result { - let ddl_task_executor: DdlTaskExecutorRef = Arc::new( + ) -> Result { + let procedure_executor: ProcedureExecutorRef = Arc::new( DdlManager::try_new( procedure_manager, datanode_manager, @@ -472,7 +472,7 @@ impl StartCommand { .context(InitDdlManagerSnafu)?, ); - Ok(ddl_task_executor) + Ok(procedure_executor) } pub async fn create_table_metadata_manager( diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 7053d10771ad..a57eacefad4b 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -9,8 +9,10 @@ api.workspace = true arc-swap = "1.0" async-trait.workspace = true chrono-tz = "0.6" +common-catalog.workspace = true common-error.workspace = true common-macro.workspace = true +common-meta.workspace = true common-query.workspace = true common-runtime.workspace = true common-telemetry.workspace = true @@ -23,6 +25,8 @@ num = "0.4" num-traits = "0.2" once_cell.workspace = true paste = "1.0" +serde.workspace = true +serde_json.workspace = true session.workspace = true snafu.workspace = true statrs = "0.16" diff --git a/src/common/function/src/function.rs b/src/common/function/src/function.rs index f47486da4502..8da7f344bbc3 100644 --- a/src/common/function/src/function.rs +++ b/src/common/function/src/function.rs @@ -30,6 +30,17 @@ pub struct FunctionContext { pub state: Arc, } +impl FunctionContext { + /// Create a mock [`FunctionContext`] for test. + #[cfg(any(test, feature = "testing"))] + pub fn mock() -> Self { + Self { + query_ctx: QueryContextBuilder::default().build(), + state: Arc::new(FunctionState::mock()), + } + } +} + impl Default for FunctionContext { fn default() -> Self { Self { diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 352009fc78b0..629f55e32235 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -13,10 +13,9 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; -use api::v1::meta::ProcedureStateResponse; use async_trait::async_trait; +use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; use common_query::error::Result; use session::context::QueryContextRef; use table::requests::{DeleteRequest, InsertRequest}; @@ -31,24 +30,18 @@ pub trait TableMutationHandler: Send + Sync { /// Delete rows from the table. async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result; - - /// Migrate a region from source peer to target peer, returns the procedure id if success. - async fn migrate_region( - &self, - region_id: u64, - from_peer: u64, - to_peer: u64, - replay_timeout: Duration, - ) -> Result; } -/// A trait for handling meta service requests in `QueryEngine`. +/// A trait for handling procedure service requests in `QueryEngine`. #[async_trait] -pub trait MetaServiceHandler: Send + Sync { +pub trait ProcedureServiceHandler: Send + Sync { + /// Migrate a region from source peer to target peer, returns the procedure id if success. + async fn migrate_region(&self, request: MigrateRegionRequest) -> Result>; + /// Query the procedure' state by its id async fn query_procedure_state(&self, pid: &str) -> Result; } pub type TableMutationHandlerRef = Arc; -pub type MetaServiceHandlerRef = Arc; +pub type ProcedureServiceHandlerRef = Arc; diff --git a/src/common/function/src/lib.rs b/src/common/function/src/lib.rs index 10fbf13a7a05..1d37d7068c98 100644 --- a/src/common/function/src/lib.rs +++ b/src/common/function/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod macros; pub mod scalars; mod system; mod table; diff --git a/src/common/function/src/macros.rs b/src/common/function/src/macros.rs new file mode 100644 index 000000000000..c8b03e816301 --- /dev/null +++ b/src/common/function/src/macros.rs @@ -0,0 +1,27 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Ensure current function is invokded under `greptime` catalog. +#[macro_export] +macro_rules! ensure_greptime { + ($func_ctx: expr) => {{ + use common_catalog::consts::DEFAULT_CATALOG_NAME; + snafu::ensure!( + $func_ctx.query_ctx.current_catalog() == DEFAULT_CATALOG_NAME, + common_query::error::PermissionDeniedSnafu { + err_msg: format!("current catalog is not {DEFAULT_CATALOG_NAME}") + } + ); + }}; +} diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index a5a4935cddac..418509dc52e9 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef}; +use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef}; /// Shared state for SQL functions. /// The handlers in state may be `None` in cli command-line or test cases. @@ -20,6 +20,45 @@ use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef}; pub struct FunctionState { // The table mutation handler pub table_mutation_handler: Option, - // The meta service handler - pub meta_service_handler: Option, + // The procedure service handler + pub procedure_service_handler: Option, +} + +impl FunctionState { + /// Create a mock [`FunctionState`] for test. + #[cfg(any(test, feature = "testing"))] + pub fn mock() -> Self { + use std::sync::Arc; + + use api::v1::meta::ProcedureStatus; + use async_trait::async_trait; + use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; + use common_query::error::Result; + + use crate::handlers::ProcedureServiceHandler; + struct MockProcedureServiceHandler; + + #[async_trait] + impl ProcedureServiceHandler for MockProcedureServiceHandler { + async fn migrate_region( + &self, + _request: MigrateRegionRequest, + ) -> Result> { + Ok(Some("test_pid".to_string())) + } + + async fn query_procedure_state(&self, _pid: &str) -> Result { + Ok(ProcedureStateResponse { + status: ProcedureStatus::Done.into(), + error: "OK".to_string(), + ..Default::default() + }) + } + } + + Self { + table_mutation_handler: None, + procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)), + } + } } diff --git a/src/common/function/src/system.rs b/src/common/function/src/system.rs index 94beda6966f9..b50dbfba07b6 100644 --- a/src/common/function/src/system.rs +++ b/src/common/function/src/system.rs @@ -14,6 +14,7 @@ mod build; mod database; +mod procedure_state; mod timezone; mod version; @@ -21,6 +22,7 @@ use std::sync::Arc; use build::BuildFunction; use database::DatabaseFunction; +use procedure_state::ProcedureStateFunction; use timezone::TimezoneFunction; use version::VersionFunction; @@ -34,5 +36,6 @@ impl SystemFunction { registry.register(Arc::new(VersionFunction)); registry.register(Arc::new(DatabaseFunction)); registry.register(Arc::new(TimezoneFunction)); + registry.register(Arc::new(ProcedureStateFunction)); } } diff --git a/src/common/function/src/system/build.rs b/src/common/function/src/system/build.rs index ce9e77fdfb7c..925b262bcdb6 100644 --- a/src/common/function/src/system/build.rs +++ b/src/common/function/src/system/build.rs @@ -22,7 +22,7 @@ use datatypes::vectors::{StringVector, VectorRef}; use crate::function::{Function, FunctionContext}; -/// Generates build information +/// Generates build information #[derive(Clone, Debug, Default)] pub struct BuildFunction; @@ -42,11 +42,7 @@ impl Function for BuildFunction { } fn signature(&self) -> Signature { - Signature::uniform( - 0, - vec![ConcreteDataType::string_datatype()], - Volatility::Immutable, - ) + Signature::uniform(0, vec![], Volatility::Immutable) } fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result { @@ -75,7 +71,7 @@ mod tests { Signature { type_signature: TypeSignature::Uniform(0, valid_types), volatility: Volatility::Immutable - } if valid_types == vec![ConcreteDataType::string_datatype()] + } if valid_types.is_empty() )); let build_info = common_version::build_info().to_string(); let vector = build.eval(FunctionContext::default(), &[]).unwrap(); diff --git a/src/common/function/src/system/procedure_state.rs b/src/common/function/src/system/procedure_state.rs new file mode 100644 index 000000000000..4f6305078465 --- /dev/null +++ b/src/common/function/src/system/procedure_state.rs @@ -0,0 +1,216 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::sync::Arc; + +use api::v1::meta::ProcedureStatus; +use common_meta::rpc::procedure::ProcedureStateResponse; +use common_query::error::Error::ThreadJoin; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, + UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::{Signature, Volatility}; +use common_telemetry::error; +use datatypes::prelude::*; +use datatypes::vectors::{ConstantVector, Helper, StringVector, VectorRef}; +use serde::Serialize; +use snafu::{ensure, Location, OptionExt}; + +use crate::function::{Function, FunctionContext}; + +const NAME: &str = "procedure_state"; + +/// A function to query procedure state by its id. +/// Such as `procedure_state(pid)`. +#[derive(Clone, Debug, Default)] +pub struct ProcedureStateFunction; + +impl fmt::Display for ProcedureStateFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "PROCEDURE_STATE") + } +} + +#[derive(Serialize)] +struct ProcedureStateJson { + status: String, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, +} + +impl Function for ProcedureStateFunction { + fn name(&self) -> &str { + NAME + } + + fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { + Ok(ConcreteDataType::string_datatype()) + } + + fn signature(&self) -> Signature { + Signature::uniform( + 1, + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) + } + + fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + crate::ensure_greptime!(func_ctx); + + ensure!( + columns.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + columns.len() + ), + } + ); + + let pids = columns[0].clone(); + let expect_len = pids.len(); + let is_const = pids.is_const(); + + match pids.data_type() { + ConcreteDataType::String(_) => { + // TODO(dennis): datafusion UDF doesn't support async function currently + std::thread::spawn(move || { + let pids: &StringVector = if is_const { + let pids: &ConstantVector = unsafe { Helper::static_cast(&pids) }; + unsafe { Helper::static_cast(pids.inner()) } + } else { + unsafe { Helper::static_cast(&pids) } + }; + + let procedure_service_handler = func_ctx + .state + .procedure_service_handler + .as_ref() + .context(MissingProcedureServiceHandlerSnafu)?; + + let states = pids + .iter_data() + .map(|pid| { + if let Some(pid) = pid { + let ProcedureStateResponse { status, error, .. } = + common_runtime::block_on_read(async move { + procedure_service_handler.query_procedure_state(pid).await + })?; + + let status = ProcedureStatus::try_from(status) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"); + + let state = ProcedureStateJson { + status: status.to_string(), + error: if error.is_empty() { None } else { Some(error) }, + }; + + Ok(Some(serde_json::to_string(&state).unwrap_or_default())) + } else { + Ok(None) + } + }) + .collect::>>()?; + + let results: VectorRef = Arc::new(StringVector::from(states)); + + if is_const { + Ok(Arc::new(ConstantVector::new(results, expect_len)) as _) + } else { + Ok(results) + } + }) + .join() + .map_err(|e| { + error!(e; "Join thread error"); + ThreadJoin { + location: Location::default(), + } + })? + } + _ => UnsupportedInputDataTypeSnafu { + function: NAME, + datatypes: columns.iter().map(|c| c.data_type()).collect::>(), + } + .fail(), + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use datatypes::vectors::StringVector; + + use super::*; + + #[test] + fn test_procedure_state_misc() { + let f = ProcedureStateFunction; + assert_eq!("procedure_state", f.name()); + assert_eq!( + ConcreteDataType::string_datatype(), + f.return_type(&[]).unwrap() + ); + assert!(matches!(f.signature(), + Signature { + type_signature: TypeSignature::Uniform(1, valid_types), + volatility: Volatility::Immutable + } if valid_types == vec![ConcreteDataType::string_datatype()] + )); + } + + #[test] + fn test_missing_procedure_service() { + let f = ProcedureStateFunction; + + let args = vec!["pid"]; + + let args = args + .into_iter() + .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + assert_eq!( + "Missing ProcedureServiceHandler, not expected", + result.to_string() + ); + } + + #[test] + fn test_procedure_state() { + let f = ProcedureStateFunction; + + let args = vec!["pid"]; + + let args = args + .into_iter() + .map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::mock(), &args).unwrap(); + + let expect: VectorRef = Arc::new(StringVector::from(vec![ + "{\"status\":\"Done\",\"error\":\"OK\"}", + ])); + assert_eq!(expect, result); + } +} diff --git a/src/common/function/src/table/migrate_region.rs b/src/common/function/src/table/migrate_region.rs index f969bada02d3..6447c6de6b3d 100644 --- a/src/common/function/src/table/migrate_region.rs +++ b/src/common/function/src/table/migrate_region.rs @@ -15,9 +15,10 @@ use std::fmt::{self}; use std::time::Duration; +use common_meta::rpc::procedure::MigrateRegionRequest; use common_query::error::Error::ThreadJoin; use common_query::error::{ - InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingTableMutationHandlerSnafu, Result, + InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingProcedureServiceHandlerSnafu, Result, }; use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_telemetry::logging::error; @@ -77,6 +78,8 @@ impl Function for MigrateRegionFunction { } fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result { + crate::ensure_greptime!(func_ctx); + let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() { 3 => { let region_ids = cast_u64_vector(&columns[0])?; @@ -106,9 +109,15 @@ impl Function for MigrateRegionFunction { } }; + // TODO(dennis): datafusion UDF doesn't support async function currently std::thread::spawn(move || { let len = region_ids.len(); let mut results = StringVectorBuilder::with_capacity(len); + let procedure_service_handler = func_ctx + .state + .procedure_service_handler + .as_ref() + .context(MissingProcedureServiceHandlerSnafu)?; for index in 0..len { let region_id = region_ids.get(index); @@ -126,24 +135,18 @@ impl Function for MigrateRegionFunction { Value::UInt64(to_peer), Value::UInt64(replay_timeout), ) => { - let func_ctx = func_ctx.clone(); - let pid = common_runtime::block_on_read(async move { - func_ctx - .state - .table_mutation_handler - .as_ref() - .context(MissingTableMutationHandlerSnafu)? - .migrate_region( + procedure_service_handler + .migrate_region(MigrateRegionRequest { region_id, from_peer, to_peer, - Duration::from_secs(replay_timeout), - ) + replay_timeout: Duration::from_secs(replay_timeout), + }) .await })?; - results.push(Some(&pid)); + results.push(pid.as_deref()) } _ => { results.push(None); @@ -171,5 +174,60 @@ impl fmt::Display for MigrateRegionFunction { #[cfg(test)] mod tests { - // FIXME(dennis): test in the following PR. + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use datatypes::vectors::{StringVector, UInt64Vector}; + + use super::*; + + #[test] + fn test_migrate_region_misc() { + let f = MigrateRegionFunction; + assert_eq!("migrate_region", f.name()); + assert_eq!( + ConcreteDataType::string_datatype(), + f.return_type(&[]).unwrap() + ); + assert!(matches!(f.signature(), + Signature { + type_signature: TypeSignature::OneOf(sigs), + volatility: Volatility::Immutable + } if sigs.len() == 2)); + } + + #[test] + fn test_missing_procedure_service() { + let f = MigrateRegionFunction; + + let args = vec![1, 1, 1]; + + let args = args + .into_iter() + .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::default(), &args).unwrap_err(); + assert_eq!( + "Missing ProcedureServiceHandler, not expected", + result.to_string() + ); + } + + #[test] + fn test_migrate_region() { + let f = MigrateRegionFunction; + + let args = vec![1, 1, 1]; + + let args = args + .into_iter() + .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::mock(), &args).unwrap(); + + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + } } diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 4a8335ef3087..d5d790f95838 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -26,6 +26,7 @@ use crate::key::table_route::TableRouteValue; use crate::key::TableMetadataManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; pub mod alter_table; pub mod create_logical_tables; @@ -46,16 +47,32 @@ pub struct ExecutorContext { pub tracing_context: Option, } +/// The procedure executor that accepts ddl, region migration task etc. #[async_trait::async_trait] -pub trait DdlTaskExecutor: Send + Sync { +pub trait ProcedureExecutor: Send + Sync { + /// Submit a ddl task async fn submit_ddl_task( &self, ctx: &ExecutorContext, request: SubmitDdlTaskRequest, ) -> Result; + + /// Submit a region migration task + async fn migrate_region( + &self, + ctx: &ExecutorContext, + request: MigrateRegionRequest, + ) -> Result; + + /// Query the procedure state by its id + async fn query_procedure_state( + &self, + ctx: &ExecutorContext, + pid: &str, + ) -> Result; } -pub type DdlTaskExecutorRef = Arc; +pub type ProcedureExecutorRef = Arc; pub struct TableMetadataAllocatorContext { pub cluster_id: u64, diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index efad73dae6a3..4a760cafcee3 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -28,10 +28,10 @@ use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::table_meta::TableMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; -use crate::ddl::{utils, DdlContext, DdlTaskExecutor, ExecutorContext}; +use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor}; use crate::error::{ self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, - SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu, + SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; @@ -46,6 +46,8 @@ use crate::rpc::ddl::{ AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; +use crate::rpc::procedure; +use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; use crate::rpc::router::RegionRoute; use crate::table_name::TableName; use crate::ClusterId; @@ -527,8 +529,9 @@ async fn handle_create_logical_table_tasks( }) } +/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it. #[async_trait::async_trait] -impl DdlTaskExecutor for DdlManager { +impl ProcedureExecutor for DdlManager { async fn submit_ddl_task( &self, ctx: &ExecutorContext, @@ -566,6 +569,37 @@ impl DdlTaskExecutor for DdlManager { .trace(span) .await } + + async fn migrate_region( + &self, + _ctx: &ExecutorContext, + _request: MigrateRegionRequest, + ) -> Result { + UnsupportedSnafu { + operation: "migrate_region", + } + .fail() + } + + async fn query_procedure_state( + &self, + _ctx: &ExecutorContext, + pid: &str, + ) -> Result { + let pid = ProcedureId::parse_str(pid) + .with_context(|_| error::ParseProcedureIdSnafu { key: pid })?; + + let state = self + .procedure_manager + .procedure_state(pid) + .await + .context(error::QueryProcedureSnafu)? + .context(error::ProcedureNotFoundSnafu { + pid: pid.to_string(), + })?; + + Ok(procedure::procedure_state_to_pb_response(&state)) + } } #[cfg(test)] diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index dc4c0cf51cec..32af562e30f8 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -100,6 +100,15 @@ pub enum Error { source: common_procedure::Error, }, + #[snafu(display("Failed to query procedure"))] + QueryProcedure { + location: Location, + source: common_procedure::Error, + }, + + #[snafu(display("Procedure not found: {pid}"))] + ProcedureNotFound { location: Location, pid: String }, + #[snafu(display("Failed to parse procedure id: {key}"))] ParseProcedureId { location: Location, @@ -431,14 +440,17 @@ impl ErrorExt for Error { | RenameTable { .. } | Unsupported { .. } => StatusCode::Internal, - PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } => { - StatusCode::InvalidArguments - } + ProcedureNotFound { .. } + | PrimaryKeyNotFound { .. } + | EmptyKey { .. } + | InvalidEngineType { .. } => StatusCode::InvalidArguments, TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, - SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(), + SubmitProcedure { source, .. } + | QueryProcedure { source, .. } + | WaitProcedure { source, .. } => source.status_code(), RegisterProcedureLoader { source, .. } => source.status_code(), External { source, .. } => source.status_code(), OperateDatanode { source, .. } => source.status_code(), diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index 9e64edb715c8..b4de8747df21 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + +pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse}; use api::v1::meta::{ ProcedureId as PbProcedureId, ProcedureStateResponse as PbProcedureStateResponse, ProcedureStatus as PbProcedureStatus, @@ -21,6 +24,15 @@ use snafu::ResultExt; use crate::error::{ParseProcedureIdSnafu, Result}; +/// A request to migrate region. +#[derive(Clone)] +pub struct MigrateRegionRequest { + pub region_id: u64, + pub from_peer: u64, + pub to_peer: u64, + pub replay_timeout: Duration, +} + /// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`]. pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result { ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| { diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index 49d8f35e39f1..758ec214b6a1 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -178,14 +178,23 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to do procedure task"))] + ProcedureService { + source: BoxedError, + location: Location, + }, + #[snafu(display("Missing TableMutationHandler, not expected"))] MissingTableMutationHandler { location: Location }, - #[snafu(display("Missing MetaServiceHandler, not expected"))] - MissingMetaServiceHandler { location: Location }, + #[snafu(display("Missing ProcedureServiceHandler, not expected"))] + MissingProcedureServiceHandler { location: Location }, #[snafu(display("Invalid function args: {}", err_msg))] InvalidFuncArgs { err_msg: String, location: Location }, + + #[snafu(display("Permission denied: {}", err_msg))] + PermissionDenied { err_msg: String, location: Location }, } pub type Result = std::result::Result; @@ -213,7 +222,7 @@ impl ErrorExt for Error { | Error::FromArrowArray { source, .. } => source.status_code(), Error::MissingTableMutationHandler { .. } - | Error::MissingMetaServiceHandler { .. } + | Error::MissingProcedureServiceHandler { .. } | Error::ExecuteRepeatedly { .. } | Error::ThreadJoin { .. } | Error::GeneralDataFusion { .. } => StatusCode::Unexpected, @@ -225,7 +234,11 @@ impl ErrorExt for Error { Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(), Error::ExecutePhysicalPlan { source, .. } => source.status_code(), Error::Execute { source, .. } => source.status_code(), - Error::TableMutation { source, .. } => source.status_code(), + Error::ProcedureService { source, .. } | Error::TableMutation { source, .. } => { + source.status_code() + } + + Error::PermissionDenied { .. } => StatusCode::PermissionDenied, } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 7480b2cfa73f..2d36e53eed8b 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -310,6 +310,7 @@ impl DatanodeBuilder { MemoryCatalogManager::with_default_setup(), None, None, + None, false, self.plugins.clone(), ); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 4dcb28ae06dc..cf58b741652b 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -147,7 +147,7 @@ impl Instance { .enable_router() .enable_store() .enable_heartbeat() - .enable_ddl() + .enable_procedure() .channel_manager(channel_manager) .ddl_channel_manager(ddl_channel_manager) .build(); diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 8e891f41faeb..c890eeba71fa 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -19,11 +19,12 @@ use catalog::kvbackend::KvBackendCatalogManager; use common_base::Plugins; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::datanode_manager::DatanodeManagerRef; -use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use operator::delete::Deleter; use operator::insert::Inserter; +use operator::procedure::ProcedureServiceOperator; use operator::statement::StatementExecutor; use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; @@ -35,12 +36,13 @@ use crate::instance::region_query::FrontendRegionQueryHandler; use crate::instance::{Instance, StatementExecutorRef}; use crate::script::ScriptExecutor; +/// The frontend [`Instance`] builder. pub struct FrontendBuilder { kv_backend: KvBackendRef, cache_invalidator: Option, datanode_manager: DatanodeManagerRef, plugins: Option, - ddl_task_executor: DdlTaskExecutorRef, + procedure_executor: ProcedureExecutorRef, heartbeat_task: Option, } @@ -48,14 +50,14 @@ impl FrontendBuilder { pub fn new( kv_backend: KvBackendRef, datanode_manager: DatanodeManagerRef, - ddl_task_executor: DdlTaskExecutorRef, + procedure_executor: ProcedureExecutorRef, ) -> Self { Self { kv_backend, cache_invalidator: None, datanode_manager, plugins: None, - ddl_task_executor, + procedure_executor, heartbeat_task: None, } } @@ -112,10 +114,15 @@ impl FrontendBuilder { deleter.clone(), )); + let procedure_service_handler = Arc::new(ProcedureServiceOperator::new( + self.procedure_executor.clone(), + )); + let query_engine = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), Some(region_query_handler.clone()), Some(table_mutation_handler), + Some(procedure_service_handler), true, plugins.clone(), ) @@ -127,7 +134,7 @@ impl FrontendBuilder { let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), query_engine.clone(), - self.ddl_task_executor, + self.procedure_executor, kv_backend.clone(), catalog_manager.clone(), inserter.clone(), diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 4620b92bea4a..94122b81a7c6 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -23,10 +23,13 @@ mod store; use api::v1::meta::Role; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::ddl::{DdlTaskExecutor, ExecutorContext}; +use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; +use common_meta::rpc::procedure::{ + MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, +}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, @@ -56,7 +59,7 @@ pub struct MetaClientBuilder { enable_router: bool, enable_store: bool, enable_lock: bool, - enable_ddl: bool, + enable_procedure: bool, channel_manager: Option, ddl_channel_manager: Option, heartbeat_channel_manager: Option, @@ -99,9 +102,9 @@ impl MetaClientBuilder { } } - pub fn enable_ddl(self) -> Self { + pub fn enable_procedure(self) -> Self { Self { - enable_ddl: true, + enable_procedure: true, ..self } } @@ -155,9 +158,9 @@ impl MetaClientBuilder { if self.enable_lock { client.lock = Some(LockClient::new(self.id, self.role, mgr.clone())); } - if self.enable_ddl { + if self.enable_procedure { let mgr = self.ddl_channel_manager.unwrap_or(mgr); - client.ddl = Some(ProcedureClient::new( + client.procedure = Some(ProcedureClient::new( self.id, self.role, mgr, @@ -176,11 +179,11 @@ pub struct MetaClient { heartbeat: Option, store: Option, lock: Option, - ddl: Option, + procedure: Option, } #[async_trait::async_trait] -impl DdlTaskExecutor for MetaClient { +impl ProcedureExecutor for MetaClient { async fn submit_ddl_task( &self, _ctx: &ExecutorContext, @@ -191,6 +194,28 @@ impl DdlTaskExecutor for MetaClient { .map_err(BoxedError::new) .context(meta_error::ExternalSnafu) } + + async fn migrate_region( + &self, + _ctx: &ExecutorContext, + request: MigrateRegionRequest, + ) -> MetaResult { + self.migrate_region(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } + + async fn query_procedure_state( + &self, + _ctx: &ExecutorContext, + pid: &str, + ) -> MetaResult { + self.query_procedure_state(pid) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } } impl MetaClient { @@ -228,7 +253,7 @@ impl MetaClient { client.start(urls.clone()).await?; info!("Lock client started"); } - if let Some(client) = &mut self.ddl { + if let Some(client) = &mut self.procedure { client.start(urls).await?; info!("DDL client started"); } @@ -328,13 +353,33 @@ impl MetaClient { Ok(()) } + /// Query the procedure state by its id. + pub async fn query_procedure_state(&self, pid: &str) -> Result { + self.procedure_client()?.query_procedure_state(pid).await + } + + /// Submit a region migration task. + pub async fn migrate_region( + &self, + request: MigrateRegionRequest, + ) -> Result { + self.procedure_client()? + .migrate_region( + request.region_id, + request.from_peer, + request.to_peer, + request.replay_timeout, + ) + .await + } + /// Submit a DDL task pub async fn submit_ddl_task( &self, req: SubmitDdlTaskRequest, ) -> Result { let res = self - .ddl_client()? + .procedure_client()? .submit_ddl_task(req.try_into().context(error::ConvertMetaRequestSnafu)?) .await? .try_into() @@ -365,8 +410,8 @@ impl MetaClient { } #[inline] - pub fn ddl_client(&self) -> Result { - self.ddl + pub fn procedure_client(&self) -> Result { + self.procedure .clone() .context(error::NotStartedSnafu { name: "ddl_client" }) } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index cddcd06885d7..2aafe519bff0 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -21,7 +21,7 @@ use std::time::Duration; use common_base::Plugins; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; -use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::peer::Peer; @@ -253,7 +253,7 @@ pub struct MetaSrv { lock: DistLockRef, procedure_manager: ProcedureManagerRef, mailbox: MailboxRef, - ddl_executor: DdlTaskExecutorRef, + procedure_executor: ProcedureExecutorRef, wal_options_allocator: WalOptionsAllocatorRef, table_metadata_manager: TableMetadataManagerRef, memory_region_keeper: MemoryRegionKeeperRef, @@ -423,8 +423,8 @@ impl MetaSrv { &self.mailbox } - pub fn ddl_executor(&self) -> &DdlTaskExecutorRef { - &self.ddl_executor + pub fn procedure_executor(&self) -> &ProcedureExecutorRef { + &self.procedure_executor } pub fn procedure_manager(&self) -> &ProcedureManagerRef { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 0a38bc3f37be..dc007a81fe01 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -329,7 +329,7 @@ impl MetaSrvBuilder { lock, procedure_manager, mailbox, - ddl_executor: ddl_manager, + procedure_executor: ddl_manager, wal_options_allocator, table_metadata_manager, greptimedb_telemetry_task: get_greptimedb_telemetry_task( diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 8181ee5e4214..a45e538a3693 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -66,7 +66,7 @@ impl procedure_service_server::ProcedureService for MetaSrv { .context(error::ConvertProtoDataSnafu)?; let resp = self - .ddl_executor() + .procedure_executor() .submit_ddl_task( &ExecutorContext { cluster_id: Some(cluster_id), diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index 53f931a87324..e672b488a9ac 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -17,6 +17,7 @@ pub mod error; pub mod expr_factory; pub mod insert; pub mod metrics; +pub mod procedure; pub mod region_req_factory; pub mod req_convert; pub mod statement; diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs new file mode 100644 index 000000000000..d36e6b6b8579 --- /dev/null +++ b/src/operator/src/procedure.rs @@ -0,0 +1,56 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_function::handlers::ProcedureServiceHandler; +use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef}; +use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; +use common_query::error as query_error; +use common_query::error::Result as QueryResult; +use snafu::ResultExt; + +/// The operator for procedures which implements [`ProcedureServiceHandler`]. +#[derive(Clone)] +pub struct ProcedureServiceOperator { + procedure_executor: ProcedureExecutorRef, +} + +impl ProcedureServiceOperator { + pub fn new(procedure_executor: ProcedureExecutorRef) -> Self { + Self { procedure_executor } + } +} + +#[async_trait] +impl ProcedureServiceHandler for ProcedureServiceOperator { + async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult> { + Ok(self + .procedure_executor + .migrate_region(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu)? + .pid + .map(|pid| String::from_utf8_lossy(&pid.key).to_string())) + } + + async fn query_procedure_state(&self, pid: &str) -> QueryResult { + self.procedure_executor + .query_procedure_state(&ExecutorContext::default(), pid) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu) + } +} diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index f76823df9694..b0cd2c773b08 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_meta::cache_invalidator::CacheInvalidatorRef; -use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::table_name::TableName; @@ -61,7 +61,7 @@ use crate::table::table_idents_to_full_name; pub struct StatementExecutor { catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, - ddl_executor: DdlTaskExecutorRef, + procedure_executor: ProcedureExecutorRef, table_metadata_manager: TableMetadataManagerRef, partition_manager: PartitionRuleManagerRef, cache_invalidator: CacheInvalidatorRef, @@ -72,7 +72,7 @@ impl StatementExecutor { pub fn new( catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, - ddl_task_executor: DdlTaskExecutorRef, + procedure_executor: ProcedureExecutorRef, kv_backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, @@ -80,7 +80,7 @@ impl StatementExecutor { Self { catalog_manager, query_engine, - ddl_executor: ddl_task_executor, + procedure_executor, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)), cache_invalidator, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 2a20f34cd667..9231ba8d5c0e 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -404,7 +404,7 @@ impl StatementExecutor { task: DdlTask::new_alter_table(expr.clone()), }; - self.ddl_executor + self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), req) .await .context(error::ExecuteDdlSnafu)?; @@ -438,7 +438,7 @@ impl StatementExecutor { task: DdlTask::new_create_table(create_table, partitions, table_info), }; - self.ddl_executor + self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) .await .context(error::ExecuteDdlSnafu) @@ -452,7 +452,7 @@ impl StatementExecutor { task: DdlTask::new_create_logical_tables(tables_data), }; - self.ddl_executor + self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) .await .context(error::ExecuteDdlSnafu) @@ -474,7 +474,7 @@ impl StatementExecutor { ), }; - self.ddl_executor + self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) .await .context(error::ExecuteDdlSnafu) @@ -494,7 +494,7 @@ impl StatementExecutor { ), }; - self.ddl_executor + self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) .await .context(error::ExecuteDdlSnafu) diff --git a/src/operator/src/table.rs b/src/operator/src/table.rs index 8d53a39c7c49..38271abb87af 100644 --- a/src/operator/src/table.rs +++ b/src/operator/src/table.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; - use async_trait::async_trait; use common_error::ext::BoxedError; use common_function::handlers::{AffectedRows, TableMutationHandler}; @@ -95,15 +93,4 @@ impl TableMutationHandler for TableMutationOperator { .map_err(BoxedError::new) .context(query_error::TableMutationSnafu) } - - async fn migrate_region( - &self, - _region_id: u64, - _from_peer: u64, - _to_peer: u64, - _replay_timeout: Duration, - ) -> QueryResult { - // FIXME(dennis): implemented in the following PR. - todo!(); - } } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index ab757a3d2edf..cc20e09d2a5f 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -74,23 +74,20 @@ impl PartitionRuleManager { Ok(route.region_routes) } - pub async fn find_region_routes_batch( + pub async fn batch_find_region_routes( &self, table_ids: &[TableId], ) -> Result>> { let table_routes = self .table_route_manager - .batch_get(table_ids) + .batch_get_physical_table_routes(table_ids) .await .context(error::TableRouteManagerSnafu)?; let mut table_region_routes = HashMap::with_capacity(table_routes.len()); for (table_id, table_route) in table_routes { - let region_routes = table_route - .region_routes() - .context(error::TableRouteManagerSnafu)? - .clone(); + let region_routes = table_route.region_routes; table_region_routes.insert(table_id, region_routes); } @@ -104,40 +101,25 @@ impl PartitionRuleManager { error::FindTableRoutesSnafu { table_id } ); - let mut partitions = Vec::with_capacity(region_routes.len()); - for r in region_routes { - let partition = r - .region - .partition - .clone() - .context(error::FindRegionRoutesSnafu { - region_id: r.region.id, - table_id, - })?; - let partition_def = PartitionDef::try_from(partition)?; + create_partitions_from_region_routes(table_id, region_routes) + } - partitions.push(PartitionInfo { - id: r.region.id, - partition: partition_def, - }); - } - partitions.sort_by(|a, b| { - a.partition - .partition_bounds() - .cmp(b.partition.partition_bounds()) - }); + pub async fn batch_find_table_partitions( + &self, + table_ids: &[TableId], + ) -> Result>> { + let batch_region_routes = self.batch_find_region_routes(table_ids).await?; - ensure!( - partitions - .windows(2) - .all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()), - error::InvalidTableRouteDataSnafu { + let mut results = HashMap::with_capacity(table_ids.len()); + + for (table_id, region_routes) in batch_region_routes { + results.insert( table_id, - err_msg: "partition columns of all regions are not the same" - } - ); + create_partitions_from_region_routes(table_id, region_routes)?, + ); + } - Ok(partitions) + Ok(results) } /// Get partition rule of given table. @@ -237,6 +219,46 @@ impl PartitionRuleManager { } } +fn create_partitions_from_region_routes( + table_id: TableId, + region_routes: Vec, +) -> Result> { + let mut partitions = Vec::with_capacity(region_routes.len()); + for r in region_routes { + let partition = r + .region + .partition + .clone() + .context(error::FindRegionRoutesSnafu { + region_id: r.region.id, + table_id, + })?; + let partition_def = PartitionDef::try_from(partition)?; + + partitions.push(PartitionInfo { + id: r.region.id, + partition: partition_def, + }); + } + partitions.sort_by(|a, b| { + a.partition + .partition_bounds() + .cmp(b.partition.partition_bounds()) + }); + + ensure!( + partitions + .windows(2) + .all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()), + error::InvalidTableRouteDataSnafu { + table_id, + err_msg: "partition columns of all regions are not the same" + } + ); + + Ok(partitions) +} + fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result> { let expr = filter.df_expr(); match expr { diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 458d2e8708e6..03945da66c9d 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -564,7 +564,7 @@ mod tests { }; catalog_manager.register_table_sync(req).unwrap(); - QueryEngineFactory::new(catalog_manager, None, None, false).query_engine() + QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine() } #[tokio::test] diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 8a4edffc4aef..18923f3b96ad 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -24,7 +24,7 @@ use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::function::FunctionRef; use common_function::function_registry::FUNCTION_REGISTRY; -use common_function::handlers::TableMutationHandlerRef; +use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef}; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use common_query::Output; @@ -101,12 +101,14 @@ impl QueryEngineFactory { catalog_manager: CatalogManagerRef, region_query_handler: Option, table_mutation_handler: Option, + procedure_service_handler: Option, with_dist_planner: bool, ) -> Self { Self::new_with_plugins( catalog_manager, region_query_handler, table_mutation_handler, + procedure_service_handler, with_dist_planner, Default::default(), ) @@ -116,6 +118,7 @@ impl QueryEngineFactory { catalog_manager: CatalogManagerRef, region_query_handler: Option, table_mutation_handler: Option, + procedure_service_handler: Option, with_dist_planner: bool, plugins: Plugins, ) -> Self { @@ -123,6 +126,7 @@ impl QueryEngineFactory { catalog_manager, region_query_handler, table_mutation_handler, + procedure_service_handler, with_dist_planner, plugins.clone(), )); @@ -156,7 +160,7 @@ mod tests { #[test] fn test_query_engine_factory() { let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); - let factory = QueryEngineFactory::new(catalog_list, None, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, None, false); let engine = factory.query_engine(); diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index b90b7d41f613..f76332cde2b4 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -70,6 +70,7 @@ impl QueryEngineContext { catalog::memory::new_memory_catalog_manager().unwrap(), None, None, + None, false, Plugins::default(), )); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f5a6a828a420..18af09973e57 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::function::FunctionRef; -use common_function::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef}; +use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef}; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::state::FunctionState; use common_query::physical_plan::SessionContext; @@ -80,6 +80,7 @@ impl QueryEngineState { catalog_list: CatalogManagerRef, region_query_handler: Option, table_mutation_handler: Option, + procedure_service_handler: Option, with_dist_planner: bool, plugins: Plugins, ) -> Self { @@ -120,8 +121,7 @@ impl QueryEngineState { catalog_manager: catalog_list, function_state: Arc::new(FunctionState { table_mutation_handler, - // FIXME(dennis): implemented in the following PR. - meta_service_handler: None, + procedure_service_handler, }), aggregate_functions: Arc::new(RwLock::new(HashMap::new())), extension_rules, @@ -219,9 +219,9 @@ impl QueryEngineState { self.function_state.table_mutation_handler.as_ref() } - /// Returns the [`MetaServiceHandlerRef`] in state. - pub fn meta_service_handler(&self) -> Option<&MetaServiceHandlerRef> { - self.function_state.meta_service_handler.as_ref() + /// Returns the [`ProcedureServiceHandlerRef`] in state. + pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> { + self.function_state.procedure_service_handler.as_ref() } pub(crate) fn disallow_cross_catalog_query(&self) -> bool { diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 36c007a4d9e6..d55fe65786fb 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -533,7 +533,7 @@ mod test { table, }) .is_ok()); - QueryEngineFactory::new(catalog_list, None, None, false).query_engine() + QueryEngineFactory::new(catalog_list, None, None, None, false).query_engine() } async fn do_query(sql: &str) -> Result { diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index ebf135d06e71..c2c8ace323bc 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -52,5 +52,5 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef { let catalog_manager = MemoryCatalogManager::new_with_table(table); - QueryEngineFactory::new(catalog_manager, None, None, false).query_engine() + QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine() } diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 067747f2636a..3fcddd504322 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -47,7 +47,7 @@ async fn test_datafusion_query_engine() -> Result<()> { let catalog_list = catalog::memory::new_memory_catalog_manager() .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let factory = QueryEngineFactory::new(catalog_list, None, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, None, false); let engine = factory.query_engine(); let column_schemas = vec![ColumnSchema::new( @@ -128,7 +128,8 @@ async fn test_query_validate() -> Result<()> { disallow_cross_catalog_query: true, }); - let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, None, false, plugins); + let factory = + QueryEngineFactory::new_with_plugins(catalog_list, None, None, None, false, plugins); let engine = factory.query_engine(); let stmt = @@ -158,7 +159,7 @@ async fn test_udf() -> Result<()> { common_telemetry::init_default_ut_logging(); let catalog_list = catalog_manager()?; - let factory = QueryEngineFactory::new(catalog_list, None, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, None, false); let engine = factory.query_engine(); let pow = make_scalar_function(pow); diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 1f5a926d79c5..c47b4e817c0a 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -106,7 +106,7 @@ fn create_test_engine() -> TimeRangeTester { }; let _ = catalog_manager.register_table_sync(req).unwrap(); - let engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine(); + let engine = QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine(); TimeRangeTester { engine, filter } } diff --git a/src/script/benches/py_benchmark.rs b/src/script/benches/py_benchmark.rs index a4ead66f1685..6568b21a2287 100644 --- a/src/script/benches/py_benchmark.rs +++ b/src/script/benches/py_benchmark.rs @@ -52,7 +52,8 @@ where pub(crate) fn sample_script_engine() -> PyEngine { let catalog_manager = MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID)); - let query_engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine(); + let query_engine = + QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine(); PyEngine::new(query_engine.clone()) } diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index c2e8eccb45a2..a0991565acfd 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -385,7 +385,7 @@ mod tests { let catalog_manager = MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID)); let query_engine = - QueryEngineFactory::new(catalog_manager, None, None, false).query_engine(); + QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine(); PyEngine::new(query_engine.clone()) } diff --git a/src/script/src/test.rs b/src/script/src/test.rs index 55ba73f582a1..b2beb799a7f9 100644 --- a/src/script/src/test.rs +++ b/src/script/src/test.rs @@ -56,7 +56,7 @@ pub async fn setup_scripts_manager( let catalog_manager = MemoryCatalogManager::new_with_table(table.clone()); - let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, false); + let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, None, false); let query_engine = factory.query_engine(); let mgr = ScriptManager::new(Arc::new(MockGrpcQueryHandler {}) as _, query_engine) .await diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index ff60ae007fa6..284935673650 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -214,7 +214,8 @@ impl GrpcQueryHandler for DummyInstance { fn create_testing_instance(table: TableRef) -> DummyInstance { let catalog_manager = MemoryCatalogManager::new_with_table(table); - let query_engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine(); + let query_engine = + QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine(); DummyInstance::new(query_engine) } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index cace9345ca83..026261551204 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -345,7 +345,7 @@ impl GreptimeDbClusterBuilder { .enable_store() .enable_heartbeat() .channel_manager(meta_srv.channel_manager) - .enable_ddl() + .enable_procedure() .build(); meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index e1aa74b6bcc2..9831eca35939 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -19,11 +19,15 @@ use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::{RegionDistribution, TableMetadataManagerRef}; use common_meta::peer::Peer; use common_query::Output; +use common_recordbatch::RecordBatches; use common_telemetry::info; use common_test_util::recordbatch::check_output_stream; use common_test_util::temp_dir::create_temp_dir; use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; +use datatypes::prelude::ScalarVector; +use datatypes::value::Value; +use datatypes::vectors::{Helper, UInt64Vector}; use frontend::error::Result as FrontendResult; use frontend::instance::Instance; use futures::future::BoxFuture; @@ -76,6 +80,7 @@ macro_rules! region_migration_tests { $service, test_region_migration, + test_region_migration_by_sql, test_region_migration_multiple_regions, test_region_migration_all_regions, test_region_migration_incorrect_from_peer, @@ -212,6 +217,125 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec) { + let cluster_name = "test_region_migration"; + let peer_factory = |id| Peer { + id, + addr: PEER_PLACEHOLDER_ADDR.to_string(), + }; + + // Prepares test cluster. + let (store_config, _guard) = get_test_store_config(&store_type); + let home_dir = create_temp_dir("test_migration_data_home"); + let datanodes = 5u64; + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let const_selector = Arc::new(ConstNodeSelector::new(vec![ + peer_factory(1), + peer_factory(2), + peer_factory(3), + ])); + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig { + broker_endpoints: endpoints.clone(), + linger: Duration::from_millis(25), + ..Default::default() + })) + .with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig { + broker_endpoints: endpoints, + num_topics: 3, + topic_name_prefix: Uuid::new_v4().to_string(), + ..Default::default() + })) + .with_shared_home_dir(Arc::new(home_dir)) + .with_meta_selector(const_selector.clone()) + .build() + .await; + let mut logical_timer = 1685508715000; + + // Prepares test table. + let table_id = prepare_testing_table(&cluster).await; + + // Inserts data + let results = insert_values(&cluster.frontend, logical_timer).await; + logical_timer += 1000; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // The region distribution + let mut distribution = find_region_distribution_by_sql(&cluster).await; + + let old_distribution = distribution.clone(); + + // Selecting target of region migration. + let region_migration_manager = cluster.meta_srv.region_migration_manager(); + let (from_peer_id, from_regions) = distribution.pop_first().unwrap(); + info!( + "Selecting from peer: {from_peer_id}, and regions: {:?}", + from_regions + ); + let (to_peer_id, to_regions) = distribution.pop_first().unwrap(); + info!( + "Selecting to peer: {to_peer_id}, and regions: {:?}", + to_regions + ); + + let region_id = RegionId::new(table_id, from_regions[0]); + // Trigger region migration. + let procedure_id = + trigger_migration_by_sql(&cluster, region_id.as_u64(), from_peer_id, to_peer_id).await; + + info!("Started region procedure: {}!", procedure_id); + + // Waits condition by checking procedure state + let frontend = cluster.frontend.clone(); + wait_condition( + Duration::from_secs(10), + Box::pin(async move { + loop { + let state = query_procedure_by_sql(&frontend, &procedure_id).await; + if state == "{\"status\":\"Done\"}" { + info!("Migration done: {state}"); + break; + } else { + info!("Migration not finished: {state}"); + tokio::time::sleep(Duration::from_millis(200)).await; + } + } + }), + ) + .await; + + // Inserts more table. + let results = insert_values(&cluster.frontend, logical_timer).await; + for result in results { + assert!(matches!(result.unwrap(), Output::AffectedRows(1))); + } + + // Asserts the writes. + assert_values(&cluster.frontend).await; + + // Triggers again. + let procedure = region_migration_manager + .submit_procedure(RegionMigrationProcedureTask::new( + 0, + region_id, + peer_factory(from_peer_id), + peer_factory(to_peer_id), + Duration::from_millis(1000), + )) + .await + .unwrap(); + assert!(procedure.is_none()); + + let new_distribution = find_region_distribution_by_sql(&cluster).await; + + assert_ne!(old_distribution, new_distribution); +} + /// A region migration test for a region server contains multiple regions of the table. pub async fn test_region_migration_multiple_regions( store_type: StorageType, @@ -724,12 +848,103 @@ async fn find_region_distribution( .unwrap() } +/// Find region distribution by SQL query +async fn find_region_distribution_by_sql(cluster: &GreptimeDbCluster) -> RegionDistribution { + let query_ctx = QueryContext::arc(); + + let Output::Stream(stream, _) = run_sql( + &cluster.frontend, + &format!(r#"select b.peer_id as datanode_id, + a.greptime_partition_id as region_id + from information_schema.partitions a left join information_schema.greptime_region_peers b + on a.greptime_partition_id = b.region_id + where a.table_name='{TEST_TABLE_NAME}' order by datanode_id asc"# + ), + query_ctx.clone(), + ) + .await.unwrap() else { + unreachable!(); + }; + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + + let mut distribution = RegionDistribution::new(); + + for batch in recordbatches.take() { + let datanode_ids: &UInt64Vector = + unsafe { Helper::static_cast(batch.column_by_name("datanode_id").unwrap()) }; + let region_ids: &UInt64Vector = + unsafe { Helper::static_cast(batch.column_by_name("region_id").unwrap()) }; + + for (datanode_id, region_id) in datanode_ids.iter_data().zip(region_ids.iter_data()) { + let (Some(datanode_id), Some(region_id)) = (datanode_id, region_id) else { + unreachable!(); + }; + + let region_id = RegionId::from_u64(region_id); + distribution + .entry(datanode_id) + .or_default() + .push(region_id.region_number()); + } + } + + distribution +} + +/// Trigger the region migration by SQL, returns the procedure id if success. +async fn trigger_migration_by_sql( + cluster: &GreptimeDbCluster, + region_id: u64, + from_peer_id: u64, + to_peer_id: u64, +) -> String { + let Output::Stream(stream, _) = run_sql( + &cluster.frontend, + &format!("select migrate_region({region_id}, {from_peer_id}, {to_peer_id})"), + QueryContext::arc(), + ) + .await + .unwrap() else { + unreachable!(); + }; + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + + let Value::String(procedure_id) = recordbatches.take()[0].column(0).get(0) else { + unreachable!(); + }; + + procedure_id.as_utf8().to_string() +} + +/// Query procedure state by SQL. +async fn query_procedure_by_sql(instance: &Arc, pid: &str) -> String { + let Output::Stream(stream, _) = run_sql( + instance, + &format!("select procedure_state('{pid}')"), + QueryContext::arc(), + ) + .await + .unwrap() else { + unreachable!(); + }; + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + + let Value::String(state) = recordbatches.take()[0].column(0).get(0) else { + unreachable!(); + }; + + state.as_utf8().to_string() +} + async fn insert_values(instance: &Arc, ts: u64) -> Vec> { let query_ctx = QueryContext::arc(); let mut results = Vec::new(); for range in [5, 15, 55] { - let result = insert_value( + let result = run_sql( instance, &format!("INSERT INTO {TEST_TABLE_NAME} VALUES ({},{})", range, ts), query_ctx.clone(), @@ -741,10 +956,11 @@ async fn insert_values(instance: &Arc, ts: u64) -> Vec, sql: &str, query_ctx: QueryContextRef, ) -> FrontendResult { + info!("Run SQL: {sql}"); instance.do_query(sql, query_ctx).await.remove(0) }