diff --git a/Cargo.lock b/Cargo.lock index 37fd88cb4613..fea962c7e5d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1228,6 +1228,7 @@ dependencies = [ "common-meta", "moka", "snafu 0.8.2", + "substrait 0.7.2", ] [[package]] @@ -1261,6 +1262,7 @@ dependencies = [ "arrow-schema", "async-stream", "async-trait", + "bytes", "catalog", "chrono", "common-catalog", @@ -2030,6 +2032,7 @@ version = "0.7.2" dependencies = [ "api", "async-trait", + "bytes", "common-base", "common-error", "common-macro", @@ -3201,7 +3204,6 @@ dependencies = [ "session", "snafu 0.8.2", "store-api", - "substrait 0.7.2", "table", "tokio", "toml 0.8.12", @@ -4227,7 +4229,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a11db14b8502f55ca5348917fd18e6fcf140f55e#a11db14b8502f55ca5348917fd18e6fcf140f55e" +source = "git+https://github.com/killme2008/greptime-proto.git?rev=9aa33273cf5ab449e687e9aa2bb10af3067984f3#9aa33273cf5ab449e687e9aa2bb10af3067984f3" dependencies = [ "prost 0.12.4", "serde", @@ -7541,23 +7543,17 @@ name = "promql" version = "0.7.2" dependencies = [ "ahash 0.8.11", - "async-recursion", "async-trait", "bytemuck", - "catalog", - "common-catalog", "common-error", "common-macro", - "common-query", "common-recordbatch", "common-telemetry", "datafusion 37.0.0", "datafusion-expr 37.0.0", - "datafusion-functions 37.0.0", "datatypes", "futures", "greptime-proto", - "itertools 0.10.5", "lazy_static", "prometheus", "promql-parser", @@ -7565,7 +7561,6 @@ dependencies = [ "query", "session", "snafu 0.8.2", - "table", "tokio", ] @@ -7873,6 +7868,7 @@ dependencies = [ "async-recursion", "async-stream", "async-trait", + "bytes", "catalog", "chrono", "common-base", @@ -7885,11 +7881,13 @@ dependencies = [ "common-plugins", "common-query", "common-recordbatch", + "common-runtime", "common-telemetry", "common-time", "datafusion 37.0.0", "datafusion-common 37.0.0", "datafusion-expr 37.0.0", + "datafusion-functions 37.0.0", "datafusion-optimizer 37.0.0", "datafusion-physical-expr 37.0.0", "datafusion-sql 37.0.0", @@ -7899,6 +7897,7 @@ dependencies = [ "futures-util", "greptime-proto", "humantime", + "itertools 0.10.5", "lazy_static", "meter-core", "meter-macros", @@ -7910,6 +7909,7 @@ dependencies = [ "prometheus", "promql", "promql-parser", + "prost 0.12.4", "rand", "regex", "session", @@ -10322,9 +10322,7 @@ version = "0.7.2" dependencies = [ "async-trait", "bytes", - "catalog", "common-error", - "common-function", "common-macro", "common-telemetry", "datafusion 37.0.0", @@ -10334,7 +10332,6 @@ dependencies = [ "datatypes", "promql", "prost 0.12.4", - "session", "snafu 0.8.2", "substrait 0.17.1", "tokio", @@ -10511,6 +10508,7 @@ dependencies = [ name = "table" version = "0.7.2" dependencies = [ + "api", "async-trait", "chrono", "common-base", diff --git a/Cargo.toml b/Cargo.toml index 1ece9e77fabc..f146a4ffc228 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,7 +118,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11db14b8502f55ca5348917fd18e6fcf140f55e" } +greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "9aa33273cf5ab449e687e9aa2bb10af3067984f3" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/cache/Cargo.toml b/src/cache/Cargo.toml index 07870fa904a5..ea433a5ceedf 100644 --- a/src/cache/Cargo.toml +++ b/src/cache/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +substrait.workspace = true catalog.workspace = true common-error.workspace = true common-macro.workspace = true diff --git a/src/cache/src/lib.rs b/src/cache/src/lib.rs index 85dc9c05f1f3..4adf0ff1ff33 100644 --- a/src/cache/src/lib.rs +++ b/src/cache/src/lib.rs @@ -20,7 +20,8 @@ use std::time::Duration; use catalog::kvbackend::new_table_cache; use common_meta::cache::{ new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, - new_table_route_cache, CacheRegistry, CacheRegistryBuilder, LayeredCacheRegistryBuilder, + new_table_route_cache, new_view_info_cache, CacheRegistry, CacheRegistryBuilder, + LayeredCacheRegistryBuilder, }; use common_meta::kv_backend::KvBackendRef; use moka::future::CacheBuilder; @@ -33,6 +34,7 @@ const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60); const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60); pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache"; +pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache"; pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache"; pub const TABLE_CACHE_NAME: &str = "table_cache"; pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache"; @@ -82,11 +84,22 @@ pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegist cache, kv_backend.clone(), )); + // Builds the view info cache + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); + let view_info_cache = Arc::new(new_view_info_cache( + VIEW_INFO_CACHE_NAME.to_string(), + cache, + kv_backend.clone(), + )); CacheRegistryBuilder::default() .add_cache(table_info_cache) .add_cache(table_name_cache) .add_cache(table_route_cache) + .add_cache(view_info_cache) .add_cache(table_flownode_set_cache) .build() } diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index ddda28ba8864..850d728aae4c 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -16,6 +16,7 @@ arrow.workspace = true arrow-schema.workspace = true async-stream.workspace = true async-trait = "0.1" +bytes.workspace = true common-catalog.workspace = true common-config.workspace = true common-error.workspace = true diff --git a/src/catalog/src/dummy_catalog.rs b/src/catalog/src/dummy_catalog.rs new file mode 100644 index 000000000000..f59b54c810dc --- /dev/null +++ b/src/catalog/src/dummy_catalog.rs @@ -0,0 +1,151 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Dummy catalog for region server. + +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::catalog::{CatalogProvider, CatalogProviderList}; +use datafusion::datasource::TableProvider; + +/// Resolve to the given region (specified by [RegionId]) unconditionally. +#[derive(Clone)] +pub struct DummyCatalogList { + catalog: DummyCatalogProvider, +} + +impl DummyCatalogList { + /// Creates a new catalog list with the given table provider. + pub fn with_table_provider(table_provider: Arc) -> Self { + let schema_provider = Arc::new(DummySchemaProvider { + table: table_provider, + }); + let catalog_provider = DummyCatalogProvider { + schema: schema_provider, + }; + Self { + catalog: catalog_provider, + } + } + + /// Creates a new catalog list with the given table providers. + pub fn with_tables(tables: HashMap>) -> Self { + let schema_provider = Arc::new(DummyTablesSchemaProvider { tables }); + let catalog_provider = DummyCatalogProvider { + schema: schema_provider, + }; + Self { + catalog: catalog_provider, + } + } +} + +impl CatalogProviderList for DummyCatalogList { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: Arc, + ) -> Option> { + None + } + + fn catalog_names(&self) -> Vec { + vec![] + } + + fn catalog(&self, _name: &str) -> Option> { + Some(Arc::new(self.catalog.clone())) + } +} + +/// A dummy catalog provider for [DummyCatalogList]. +#[derive(Clone)] +struct DummyCatalogProvider { + schema: Arc, +} + +impl CatalogProvider for DummyCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + vec![] + } + + fn schema(&self, _name: &str) -> Option> { + Some(self.schema.clone()) + } +} + +/// A dummy schema provider for [DummyCatalogList]. +#[derive(Clone)] +struct DummySchemaProvider { + table: Arc, +} + +#[async_trait] +impl SchemaProvider for DummySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + vec![] + } + + async fn table( + &self, + _name: &str, + ) -> datafusion::error::Result>> { + Ok(Some(self.table.clone())) + } + + fn table_exist(&self, _name: &str) -> bool { + true + } +} + +/// A dummy schema provider for [DummyCatalogList]. +#[derive(Clone)] +struct DummyTablesSchemaProvider { + tables: HashMap>, +} + +#[async_trait] +impl SchemaProvider for DummyTablesSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables.keys().map(|t| t.to_string()).collect() + } + + async fn table(&self, name: &str) -> datafusion::error::Result>> { + Ok(self.tables.get(name).cloned()) + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 5834eaed359d..f9b6534ee660 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -324,6 +324,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to get view info from cache"))] + GetViewCache { + source: common_meta::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Cache not found: {name}"))] + CacheNotFound { + name: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -337,6 +351,7 @@ impl ErrorExt for Error { | Error::FindPartitions { .. } | Error::FindRegionRoutes { .. } | Error::InvalidEntryType { .. } + | Error::CacheNotFound { .. } | Error::ParallelOpenTable { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, @@ -385,7 +400,9 @@ impl ErrorExt for Error { Error::QueryAccessDenied { .. } => StatusCode::AccessDenied, Error::Datafusion { .. } => StatusCode::EngineExecuteQuery, Error::TableMetadataManager { source, .. } => source.status_code(), - Error::GetTableCache { .. } => StatusCode::Internal, + Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => { + source.status_code() + } } } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index e7a4ef4be39c..1961696ed019 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -22,7 +22,7 @@ use common_catalog::consts::{ }; use common_config::Mode; use common_error::ext::BoxedError; -use common_meta::cache::TableRouteCacheRef; +use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef}; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoValue; @@ -41,8 +41,8 @@ use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; use crate::error::{ - GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, - ListTablesSnafu, Result, TableMetadataManagerSnafu, + CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, + ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu, }; use crate::information_schema::InformationSchemaProvider; use crate::kvbackend::TableCacheRef; @@ -61,7 +61,7 @@ pub struct KvBackendCatalogManager { table_metadata_manager: TableMetadataManagerRef, /// A sub-CatalogManager that handles system tables system_catalog: SystemCatalog, - table_cache: TableCacheRef, + cache_registry: LayeredCacheRegistryRef, } const CATALOG_CACHE_MAX_CAPACITY: u64 = 128; @@ -71,15 +71,14 @@ impl KvBackendCatalogManager { mode: Mode, meta_client: Option>, backend: KvBackendRef, - table_cache: TableCacheRef, - table_route_cache: TableRouteCacheRef, + cache_registry: LayeredCacheRegistryRef, ) -> Arc { Arc::new_cyclic(|me| Self { mode, meta_client, partition_manager: Arc::new(PartitionRuleManager::new( backend.clone(), - table_route_cache, + cache_registry.get().expect(""), )), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), system_catalog: SystemCatalog { @@ -90,7 +89,7 @@ impl KvBackendCatalogManager { me.clone(), )), }, - table_cache, + cache_registry, }) } @@ -99,6 +98,10 @@ impl KvBackendCatalogManager { &self.mode } + pub fn view_info_cache(&self) -> ViewInfoCacheRef { + self.cache_registry.get().unwrap() + } + /// Returns the `[MetaClient]`. pub fn meta_client(&self) -> Option> { self.meta_client.clone() @@ -215,7 +218,11 @@ impl CatalogManager for KvBackendCatalogManager { return Ok(Some(table)); } - self.table_cache + let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu { + name: "table_cache", + })?; + + table_cache .get_by_ref(&TableName { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 494a94df2699..46caa476a6bb 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -27,6 +27,7 @@ use table::TableRef; use crate::error::Result; +pub mod dummy_catalog; pub mod error; pub mod information_schema; pub mod kvbackend; diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index 58813a460e33..e048a4cf06d5 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -15,14 +15,19 @@ use std::collections::HashMap; use std::sync::Arc; +use bytes::Bytes; use common_catalog::format_full_table_name; +use common_query::logical_plan::SubstraitPlanDecoderRef; use datafusion::common::{ResolvedTableReference, TableReference}; -use datafusion::datasource::provider_as_source; +use datafusion::datasource::view::ViewTable; +use datafusion::datasource::{provider_as_source, TableProvider}; use datafusion::logical_expr::TableSource; use session::context::QueryContext; use snafu::{ensure, OptionExt}; +use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; +use crate::dummy_catalog::DummyCatalogList; use crate::error::{QueryAccessDeniedSnafu, Result, TableNotExistSnafu}; use crate::CatalogManagerRef; @@ -32,6 +37,7 @@ pub struct DfTableSourceProvider { disallow_cross_catalog_query: bool, default_catalog: String, default_schema: String, + plan_decoder: SubstraitPlanDecoderRef, } impl DfTableSourceProvider { @@ -39,6 +45,7 @@ impl DfTableSourceProvider { catalog_manager: CatalogManagerRef, disallow_cross_catalog_query: bool, query_ctx: &QueryContext, + plan_decoder: SubstraitPlanDecoderRef, ) -> Self { Self { catalog_manager, @@ -46,6 +53,7 @@ impl DfTableSourceProvider { resolved_tables: HashMap::new(), default_catalog: query_ctx.current_catalog().to_owned(), default_schema: query_ctx.current_schema().to_owned(), + plan_decoder, } } @@ -94,8 +102,62 @@ impl DfTableSourceProvider { table: format_full_table_name(catalog_name, schema_name, table_name), })?; - let provider = DfTableProviderAdapter::new(table); - let source = provider_as_source(Arc::new(provider)); + let provider: Arc = if table.table_info().table_type == TableType::View { + use crate::kvbackend::KvBackendCatalogManager; + let catalog_manager = self + .catalog_manager + .as_any() + .downcast_ref::() + .unwrap(); + + let view_info = catalog_manager + .view_info_cache() + .get(table.table_info().ident.table_id) + .await + .unwrap() + .unwrap(); + + let mut tables = HashMap::with_capacity(view_info.table_names.len()); + + for table_name in &view_info.table_names { + let table = self + .catalog_manager + .table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await? + .with_context(|| TableNotExistSnafu { + table: format_full_table_name( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ), + })?; + + let name = &table.table_info().name; + + let table_provider: Arc = + Arc::new(DfTableProviderAdapter::new(table)); + //FIXME: use datafusion::catalog::MemoryCatalogProviderList instead. + tables.insert(name.to_string(), table_provider); + } + + let catalog_list = Arc::new(DummyCatalogList::with_tables(tables)); + + let logical_plan = self + .plan_decoder + .decode(Bytes::from(view_info.view_info.clone()), catalog_list) + .await + .unwrap(); + + Arc::new(ViewTable::try_new(logical_plan, None).unwrap()) + } else { + Arc::new(DfTableProviderAdapter::new(table)) + }; + + let source = provider_as_source(provider); let _ = self.resolved_tables.insert(resolved_name, source.clone()); Ok(source) } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 0e1fec26dfa4..e1510da388a2 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -303,13 +303,6 @@ pub enum Error { source: common_runtime::error::Error, }, - #[snafu(display("Failed to get cache from cache registry: {}", name))] - CacheRequired { - #[snafu(implicit)] - location: Location, - name: String, - }, - #[snafu(display("Failed to build cache registry"))] BuildCacheRegistry { #[snafu(implicit)] @@ -365,11 +358,11 @@ impl ErrorExt for Error { Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected, - Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal, - Error::Other { source, .. } => source.status_code(), Error::BuildRuntime { source, .. } => source.status_code(), + + Error::BuildCacheRegistry { .. } => StatusCode::Internal, } } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 4297553304f8..67fdb687372e 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -16,10 +16,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use cache::{ - build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, - TABLE_ROUTE_CACHE_NAME, -}; +use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; use client::client_manager::DatanodeClients; @@ -284,23 +281,11 @@ impl StartCommand { .build(), ); - let table_cache = layered_cache_registry - .get() - .context(error::CacheRequiredSnafu { - name: TABLE_CACHE_NAME, - })?; - let table_route_cache = - layered_cache_registry - .get() - .context(error::CacheRequiredSnafu { - name: TABLE_ROUTE_CACHE_NAME, - })?; let catalog_manager = KvBackendCatalogManager::new( opts.mode, Some(meta_client.clone()), cached_meta_backend.clone(), - table_cache, - table_route_cache, + layered_cache_registry.clone(), ) .await; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index ff810ead5d41..a8f208d1d1f7 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,10 +16,7 @@ use std::sync::Arc; use std::{fs, path}; use async_trait::async_trait; -use cache::{ - build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, - TABLE_ROUTE_CACHE_NAME, -}; +use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; @@ -59,13 +56,13 @@ use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use crate::error::{ - BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, - InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, Result, - ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, - StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, + BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, + InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, Result, ShutdownDatanodeSnafu, + ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, + StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, Options}; use crate::App; @@ -411,18 +408,11 @@ impl StartCommand { .build(), ); - let table_cache = layered_cache_registry.get().context(CacheRequiredSnafu { - name: TABLE_CACHE_NAME, - })?; - let table_route_cache = layered_cache_registry.get().context(CacheRequiredSnafu { - name: TABLE_ROUTE_CACHE_NAME, - })?; let catalog_manager = KvBackendCatalogManager::new( dn_opts.mode, None, kv_backend.clone(), - table_cache, - table_route_cache, + layered_cache_registry.clone(), ) .await; diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index b7d13a6f0ec0..52dae1a094af 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -24,7 +24,7 @@ pub use registry::{ LayeredCacheRegistryBuilder, LayeredCacheRegistryRef, }; pub use table::{ - new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache, - TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, TableRouteCache, - TableRouteCacheRef, + new_table_info_cache, new_table_name_cache, new_table_route_cache, new_view_info_cache, + TableInfoCache, TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, + TableRouteCache, TableRouteCacheRef, ViewInfoCache, ViewInfoCacheRef, }; diff --git a/src/common/meta/src/cache/table.rs b/src/common/meta/src/cache/table.rs index fa3bcbd30994..82a3ad98df33 100644 --- a/src/common/meta/src/cache/table.rs +++ b/src/common/meta/src/cache/table.rs @@ -15,6 +15,9 @@ mod table_info; mod table_name; mod table_route; +mod view_info; + pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef}; pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef}; pub use table_route::{new_table_route_cache, TableRoute, TableRouteCache, TableRouteCacheRef}; +pub use view_info::{new_view_info_cache, ViewInfoCache, ViewInfoCacheRef}; diff --git a/src/common/meta/src/cache/table/view_info.rs b/src/common/meta/src/cache/table/view_info.rs new file mode 100644 index 000000000000..2f3c0e210ccf --- /dev/null +++ b/src/common/meta/src/cache/table/view_info.rs @@ -0,0 +1,114 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::future::BoxFuture; +use moka::future::Cache; +use snafu::OptionExt; +use store_api::storage::TableId; + +use crate::cache::{CacheContainer, Initializer}; +use crate::error; +use crate::error::Result; +use crate::instruction::CacheIdent; +use crate::key::view_info::{ViewInfoManager, ViewInfoManagerRef, ViewInfoValue}; +use crate::kv_backend::KvBackendRef; + +/// [ViewInfoCache] caches the [TableId] to [ViewInfoValue] mapping. +pub type ViewInfoCache = CacheContainer, CacheIdent>; + +pub type ViewInfoCacheRef = Arc; + +/// Constructs a [ViewInfoCache]. +pub fn new_view_info_cache( + name: String, + cache: Cache>, + kv_backend: KvBackendRef, +) -> ViewInfoCache { + let view_info_manager = Arc::new(ViewInfoManager::new(kv_backend)); + let init = init_factory(view_info_manager); + + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory(view_info_manager: ViewInfoManagerRef) -> Initializer> { + Arc::new(move |view_id| { + let view_info_manager = view_info_manager.clone(); + Box::pin(async move { + let view_info = view_info_manager + .get(*view_id) + .await? + .context(error::ValueNotExistSnafu {})? + .into_inner(); + + Ok(Some(Arc::new(view_info))) + }) + }) +} + +fn invalidator<'a>( + cache: &'a Cache>, + ident: &'a CacheIdent, +) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + if let CacheIdent::TableId(table_id) = ident { + cache.invalidate(table_id).await + } + Ok(()) + }) +} + +fn filter(ident: &CacheIdent) -> bool { + matches!(ident, CacheIdent::TableId(_)) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use moka::future::CacheBuilder; + + use super::*; + use crate::ddl::tests::create_view::test_create_view_task; + use crate::key::TableMetadataManager; + use crate::kv_backend::memory::MemoryKvBackend; + + #[tokio::test] + async fn test_view_info_cache() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let cache = CacheBuilder::new(128).build(); + let cache = new_view_info_cache("test".to_string(), cache, mem_kv.clone()); + + let result = cache.get(1024).await.unwrap(); + assert!(result.is_none()); + let mut task = test_create_view_task("my_view"); + task.view_info.ident.table_id = 1024; + table_metadata_manager + .create_view_metadata(task.view_info.clone(), &task.create_view.logical_plan) + .await + .unwrap(); + + let view_info = cache.get(1024).await.unwrap().unwrap(); + assert_eq!((*view_info).view_info, task.create_view.logical_plan); + + assert!(cache.contains_key(&1024)); + cache + .invalidate(&[CacheIdent::TableId(1024)]) + .await + .unwrap(); + assert!(!cache.contains_key(&1024)); + } +} diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 8d8cb8d5a45d..c00b6df08e6b 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -48,7 +48,7 @@ pub mod table_meta; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] -mod tests; +pub(crate) mod tests; pub mod truncate_table; pub mod utils; diff --git a/src/common/meta/src/ddl/create_view.rs b/src/common/meta/src/ddl/create_view.rs index c9163dd5cabc..84b842826542 100644 --- a/src/common/meta/src/ddl/create_view.rs +++ b/src/common/meta/src/ddl/create_view.rs @@ -172,15 +172,21 @@ impl CreateViewProcedure { view_name: self.creator.data.table_ref().to_string(), })?; let new_logical_plan = self.creator.data.task.raw_logical_plan().clone(); + let table_names = self.creator.data.task.table_names(); + manager - .update_view_info(view_id, ¤t_view_info, new_logical_plan) + .update_view_info(view_id, ¤t_view_info, new_logical_plan, table_names) .await?; info!("Updated view metadata for view {view_id}"); } else { let raw_view_info = self.view_info().clone(); manager - .create_view_metadata(raw_view_info, self.creator.data.task.raw_logical_plan()) + .create_view_metadata( + raw_view_info, + self.creator.data.task.raw_logical_plan(), + self.creator.data.task.table_names(), + ) .await?; info!( diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs index 3c550883ffc2..9a0db96a37e0 100644 --- a/src/common/meta/src/ddl/tests.rs +++ b/src/common/meta/src/ddl/tests.rs @@ -17,7 +17,7 @@ mod alter_table; mod create_flow; mod create_logical_tables; mod create_table; -mod create_view; +pub(crate) mod create_view; mod drop_database; mod drop_flow; mod drop_table; diff --git a/src/common/meta/src/ddl/tests/create_view.rs b/src/common/meta/src/ddl/tests/create_view.rs index 693faddeb3f3..0490393e255e 100644 --- a/src/common/meta/src/ddl/tests/create_view.rs +++ b/src/common/meta/src/ddl/tests/create_view.rs @@ -31,7 +31,7 @@ use crate::error::Error; use crate::rpc::ddl::CreateViewTask; use crate::test_util::{new_ddl_context, MockDatanodeManager}; -fn test_create_view_task(name: &str) -> CreateViewTask { +pub(crate) fn test_create_view_task(name: &str) -> CreateViewTask { let expr = CreateViewExpr { catalog_name: "greptime".to_string(), schema_name: "public".to_string(), diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 9090eb075f3c..9cc11367068b 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -491,6 +491,7 @@ impl TableMetadataManager { &self, view_info: RawTableInfo, raw_logical_plan: &Vec, + table_names: HashSet, ) -> Result<()> { let view_id = view_info.ident.table_id; @@ -512,7 +513,7 @@ impl TableMetadataManager { .build_create_txn(view_id, &table_info_value)?; // Creates view info - let view_info_value = ViewInfoValue::new(raw_logical_plan); + let view_info_value = ViewInfoValue::new(raw_logical_plan, table_names); let (create_view_info_txn, on_create_view_info_failure) = self .view_info_manager() .build_create_txn(view_id, &view_info_value)?; @@ -903,8 +904,9 @@ impl TableMetadataManager { view_id: TableId, current_view_info_value: &DeserializedValueWithBytes, new_view_info: Vec, + table_names: HashSet, ) -> Result<()> { - let new_view_info_value = current_view_info_value.update(new_view_info); + let new_view_info_value = current_view_info_value.update(new_view_info, table_names); // Updates view info. let (update_view_info_txn, on_update_view_info_failure) = self diff --git a/src/common/meta/src/key/view_info.rs b/src/common/meta/src/key/view_info.rs index 98c8a1a73178..5b17d340d488 100644 --- a/src/common/meta/src/key/view_info.rs +++ b/src/common/meta/src/key/view_info.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; +use std::sync::Arc; use serde::{Deserialize, Serialize}; use snafu::OptionExt; use table::metadata::TableId; +use table::table_name::TableName; use super::VIEW_INFO_KEY_PATTERN; use crate::error::{InvalidViewInfoSnafu, Result}; @@ -80,21 +82,30 @@ impl<'a> MetaKey<'a, ViewInfoKey> for ViewInfoKey { /// The VIEW info value that keeps the metadata. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct ViewInfoValue { + /// The encoded logical plan pub view_info: RawViewLogicalPlan, + /// The resolved fully table names in logical plan + pub table_names: HashSet, version: u64, } impl ViewInfoValue { - pub fn new(view_info: &RawViewLogicalPlan) -> Self { + pub fn new(view_info: &RawViewLogicalPlan, table_names: HashSet) -> Self { Self { view_info: view_info.clone(), + table_names, version: 0, } } - pub(crate) fn update(&self, new_view_info: RawViewLogicalPlan) -> Self { + pub(crate) fn update( + &self, + new_view_info: RawViewLogicalPlan, + table_names: HashSet, + ) -> Self { Self { view_info: new_view_info, + table_names, version: self.version + 1, } } @@ -105,6 +116,8 @@ pub struct ViewInfoManager { kv_backend: KvBackendRef, } +pub type ViewInfoManagerRef = Arc; + impl ViewInfoManager { pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 5398a62a6752..05e80af0e01a 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -40,7 +40,7 @@ pub mod region_keeper; pub mod rpc; pub mod sequence; pub mod state_store; -pub mod table_name; +pub use table::table_name; #[cfg(any(test, feature = "testing"))] pub mod test_util; pub mod util; diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index e42639c381a7..a2b5596ff34a 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::result; use api::v1::meta::ddl_task_request::Task; @@ -332,6 +332,14 @@ impl CreateViewTask { pub fn raw_logical_plan(&self) -> &Vec { &self.create_view.logical_plan } + + pub fn table_names(&self) -> HashSet { + self.create_view + .table_names + .iter() + .map(|t| t.clone().into()) + .collect() + } } impl TryFrom for CreateViewTask { diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index 443640016488..106df34c7458 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] api.workspace = true async-trait.workspace = true +bytes.workspace = true common-error.workspace = true common-macro.workspace = true common-recordbatch.workspace = true diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index 5a100fc6131e..736c1b1108c1 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -19,6 +19,8 @@ mod udf; use std::sync::Arc; +use datafusion::catalog::CatalogProviderList; +use datafusion::logical_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; pub use expr::build_filter_from_timestamp; @@ -26,6 +28,7 @@ pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunc pub use self::expr::{DfExpr, Expr}; pub use self::udaf::AggregateFunction; pub use self::udf::ScalarUdf; +use crate::error::Result; use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation}; use crate::logical_plan::accumulator::*; use crate::signature::{Signature, Volatility}; @@ -69,6 +72,18 @@ pub fn create_aggregate_function( ) } +/// The datafusion `[LogicalPlan]` decoder. +#[async_trait::async_trait] +pub trait SubstraitPlanDecoder { + async fn decode( + &self, + message: bytes::Bytes, + catalog_list: Arc, + ) -> Result; +} + +pub type SubstraitPlanDecoderRef = Arc; + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/substrait/Cargo.toml b/src/common/substrait/Cargo.toml index a2fb0e272594..3da8b6310017 100644 --- a/src/common/substrait/Cargo.toml +++ b/src/common/substrait/Cargo.toml @@ -10,19 +10,15 @@ workspace = true [dependencies] async-trait.workspace = true bytes.workspace = true -catalog.workspace = true common-error.workspace = true -common-function.workspace = true common-macro.workspace = true common-telemetry.workspace = true datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datafusion-substrait.workspace = true -datatypes.workspace = true promql.workspace = true prost.workspace = true -session.workspace = true snafu.workspace = true [dependencies.substrait_proto] diff --git a/src/common/substrait/src/df_substrait.rs b/src/common/substrait/src/df_substrait.rs index 0730f0773b32..9217b60cc5b6 100644 --- a/src/common/substrait/src/df_substrait.rs +++ b/src/common/substrait/src/df_substrait.rs @@ -16,26 +16,19 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::{Buf, Bytes, BytesMut}; -use common_function::function_registry::FUNCTION_REGISTRY; -use common_function::scalars::udf::create_udf; use datafusion::catalog::CatalogProviderList; use datafusion::execution::context::SessionState; use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::execution::FunctionRegistry; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_expr::LogicalPlan; use datafusion_substrait::logical_plan::consumer::from_substrait_plan; use datafusion_substrait::logical_plan::producer::to_substrait_plan; use datafusion_substrait::substrait::proto::Plan; use prost::Message; -use session::context::QueryContextRef; use snafu::ResultExt; -use crate::error::{ - DFInternalSnafu, DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error, -}; -use crate::extension_serializer::ExtensionSerializer; -use crate::SubstraitPlan; +use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error}; +use crate::{SerializerRegistry, SubstraitPlan}; pub struct DFLogicalSubstraitConvertor; @@ -49,15 +42,8 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { &self, message: B, catalog_list: Arc, - mut state: SessionState, - query_ctx: QueryContextRef, + state: SessionState, ) -> Result { - // substrait decoder will look up the UDFs in SessionState, so we need to register them - for func in FUNCTION_REGISTRY.functions() { - let udf = Arc::new(create_udf(func, query_ctx.clone(), Default::default()).into()); - state.register_udf(udf).context(DFInternalSnafu)?; - } - let mut context = SessionContext::new_with_state(state); context.register_catalog_list(catalog_list); let plan = Plan::decode(message).context(DecodeRelSnafu)?; @@ -67,10 +53,13 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { Ok(df_plan) } - fn encode(&self, plan: &Self::Plan) -> Result { + fn encode( + &self, + plan: &Self::Plan, + serializer: impl SerializerRegistry + 'static, + ) -> Result { let mut buf = BytesMut::new(); - - let substrait_plan = self.to_sub_plan(plan)?; + let substrait_plan = self.to_sub_plan(plan, serializer)?; substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?; Ok(buf.freeze()) @@ -78,10 +67,14 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { } impl DFLogicalSubstraitConvertor { - pub fn to_sub_plan(&self, plan: &LogicalPlan) -> Result, Error> { + pub fn to_sub_plan( + &self, + plan: &LogicalPlan, + serializer: impl SerializerRegistry + 'static, + ) -> Result, Error> { let session_state = SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default())) - .with_serializer_registry(Arc::new(ExtensionSerializer)); + .with_serializer_registry(Arc::new(serializer)); let context = SessionContext::new_with_state(session_state); to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu) diff --git a/src/common/substrait/src/error.rs b/src/common/substrait/src/error.rs index 07cc310d3934..5a39a1a6d1d0 100644 --- a/src/common/substrait/src/error.rs +++ b/src/common/substrait/src/error.rs @@ -18,7 +18,6 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion::error::DataFusionError; -use datatypes::prelude::ConcreteDataType; use prost::{DecodeError, EncodeError}; use snafu::{Location, Snafu}; @@ -26,34 +25,6 @@ use snafu::{Location, Snafu}; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("Unsupported physical plan: {}", name))] - UnsupportedPlan { - name: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Unsupported expr: {}", name))] - UnsupportedExpr { - name: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Unsupported concrete type: {:?}", ty))] - UnsupportedConcreteType { - ty: ConcreteDataType, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Unsupported substrait type: {}", ty))] - UnsupportedSubstraitType { - ty: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to decode substrait relation"))] DecodeRel { #[snafu(source)] @@ -70,33 +41,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Input plan is empty"))] - EmptyPlan { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Input expression is empty"))] - EmptyExpr { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Missing required field in protobuf, field: {}, plan: {}", field, plan))] - MissingField { - field: String, - plan: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Invalid parameters: {}", reason))] - InvalidParameters { - reason: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Internal error from DataFusion"))] DFInternal { #[snafu(source)] @@ -118,35 +62,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Schema from Substrait proto doesn't match with the schema in storage. - Substrait schema: {:?} - Storage schema: {:?}", - substrait_schema, - storage_schema - ))] - SchemaNotMatch { - substrait_schema: datafusion::arrow::datatypes::SchemaRef, - storage_schema: datafusion::arrow::datatypes::SchemaRef, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to convert DataFusion schema"))] - ConvertDfSchema { - #[snafu(implicit)] - location: Location, - source: datatypes::error::Error, - }, - - #[snafu(display("Unable to resolve table: {table_name}, error: "))] - ResolveTable { - table_name: String, - #[snafu(implicit)] - location: Location, - source: catalog::error::Error, - }, - #[snafu(display("Failed to encode DataFusion plan"))] EncodeDfPlan { #[snafu(source)] @@ -169,24 +84,13 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::UnsupportedConcreteType { .. } - | Error::UnsupportedPlan { .. } - | Error::UnsupportedExpr { .. } - | Error::UnsupportedSubstraitType { .. } => StatusCode::Unsupported, - Error::UnknownPlan { .. } - | Error::EncodeRel { .. } - | Error::DecodeRel { .. } - | Error::EmptyPlan { .. } - | Error::EmptyExpr { .. } - | Error::MissingField { .. } - | Error::InvalidParameters { .. } - | Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments, + Error::UnknownPlan { .. } | Error::EncodeRel { .. } | Error::DecodeRel { .. } => { + StatusCode::InvalidArguments + } Error::DFInternal { .. } | Error::Internal { .. } | Error::EncodeDfPlan { .. } | Error::DecodeDfPlan { .. } => StatusCode::Internal, - Error::ConvertDfSchema { source, .. } => source.status_code(), - Error::ResolveTable { source, .. } => source.status_code(), } } diff --git a/src/common/substrait/src/extension_serializer.rs b/src/common/substrait/src/extension_serializer.rs index 89944db508f9..a8179437687e 100644 --- a/src/common/substrait/src/extension_serializer.rs +++ b/src/common/substrait/src/extension_serializer.rs @@ -67,7 +67,6 @@ impl SerializerRegistry for ExtensionSerializer { name if name == EmptyMetric::name() => Err(DataFusionError::Substrait( "EmptyMetric should not be serialized".to_string(), )), - "MergeScan" => Ok(vec![]), other => Err(DataFusionError::NotImplemented(format!( "Serizlize logical plan for {}", other diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs index 8a03dd7308ed..756e701c489a 100644 --- a/src/common/substrait/src/lib.rs +++ b/src/common/substrait/src/lib.rs @@ -23,11 +23,11 @@ use async_trait::async_trait; use bytes::{Buf, Bytes}; use datafusion::catalog::CatalogProviderList; use datafusion::execution::context::SessionState; +pub use datafusion::execution::registry::SerializerRegistry; /// Re-export the Substrait module of datafusion, /// note this is a different version of the `substrait_proto` crate pub use datafusion_substrait::substrait as substrait_proto_df; pub use datafusion_substrait::{logical_plan as df_logical_plan, variation_const}; -use session::context::QueryContextRef; pub use substrait_proto; pub use crate::df_substrait::DFLogicalSubstraitConvertor; @@ -42,8 +42,11 @@ pub trait SubstraitPlan { message: B, catalog_list: Arc, state: SessionState, - query_ctx: QueryContextRef, ) -> Result; - fn encode(&self, plan: &Self::Plan) -> Result; + fn encode( + &self, + plan: &Self::Plan, + serializer: impl SerializerRegistry + 'static, + ) -> Result; } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 26a7ccb67563..a5408b0c3246 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -57,7 +57,6 @@ servers.workspace = true session.workspace = true snafu.workspace = true store-api.workspace = true -substrait.workspace = true table.workspace = true tokio.workspace = true toml.workspace = true diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index ec3bccceeb41..c8067de189da 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -68,7 +68,7 @@ pub enum Error { DecodeLogicalPlan { #[snafu(implicit)] location: Location, - source: substrait::error::Error, + source: common_query::error::Error, }, #[snafu(display("Incorrect internal state: {}", state))] diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 440f0a3d036d..912024b570fb 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -48,7 +48,6 @@ use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADAT use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest}; use store_api::storage::RegionId; -use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ @@ -634,14 +633,11 @@ impl RegionServerInner { let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider)); let query_engine_ctx = self.query_engine.engine_context(ctx.clone()); + let plan_decoder = query_engine_ctx.new_plan_decoder(); + // decode substrait plan to logical plan and execute it - let logical_plan = DFLogicalSubstraitConvertor - .decode( - Bytes::from(plan), - catalog_list, - query_engine_ctx.state().clone(), - ctx.clone(), - ) + let logical_plan = plan_decoder + .decode(Bytes::from(plan), catalog_list) .await .context(DecodeLogicalPlanSnafu)?; diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 8eca0788e8a1..86b2988df193 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -23,6 +23,7 @@ use literal::{from_substrait_literal, from_substrait_type}; use prost::Message; use query::parser::QueryLanguageParser; use query::plan::LogicalPlan; +use query::query_engine::DefaultSerializer; use query::QueryEngine; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; @@ -121,7 +122,7 @@ pub async fn sql_to_flow_plan( .context(ExternalSnafu)?; let LogicalPlan::DfPlan(plan) = plan; let sub_plan = DFLogicalSubstraitConvertor {} - .to_sub_plan(&plan) + .to_sub_plan(&plan, DefaultSerializer) .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -196,7 +197,9 @@ mod test { let LogicalPlan::DfPlan(plan) = plan; // encode then decode so to rely on the impl of conversion from logical plan to substrait plan - let bytes = DFLogicalSubstraitConvertor {}.encode(&plan).unwrap(); + let bytes = DFLogicalSubstraitConvertor {} + .encode(&plan, DefaultSerializer) + .unwrap(); proto::Plan::decode(bytes).unwrap() } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index b34bedc7c90f..4e33519cdf7b 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -519,6 +519,7 @@ pub(crate) fn to_alter_expr( pub fn to_create_view_expr( stmt: CreateView, logical_plan: Vec, + table_names: Vec, query_ctx: QueryContextRef, ) -> Result { let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx) @@ -532,6 +533,7 @@ pub fn to_create_view_expr( logical_plan, create_if_not_exists: stmt.if_not_exists, or_replace: stmt.or_replace, + table_names, }; Ok(expr) diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 521e3563fa9a..af9fc280c99e 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -43,6 +43,8 @@ use lazy_static::lazy_static; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::partition::{PartitionBound, PartitionDef}; use query::parser::QueryStatement; +use query::plan::extract_full_table_names; +use query::query_engine::DefaultSerializer; use query::sql::create_table_stmt; use regex::Regex; use session::context::QueryContextRef; @@ -398,15 +400,27 @@ impl StatementExecutor { return InvalidViewStmtSnafu {}.fail(); } }; - let optimized_plan = self.optimize_logical_plan(logical_plan)?; + + // Extract the table names before optimizing the plan + let table_names = extract_full_table_names(logical_plan.df_plan()) + .unwrap() + .into_iter() + .map(|t| t.into()) + .collect(); + + let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan(); // encode logical plan let encoded_plan = DFLogicalSubstraitConvertor - .encode(&optimized_plan.unwrap_df_plan()) + .encode(&optimized_plan, DefaultSerializer) .context(SubstraitCodecSnafu)?; - let expr = - expr_factory::to_create_view_expr(create_view, encoded_plan.to_vec(), ctx.clone())?; + let expr = expr_factory::to_create_view_expr( + create_view, + encoded_plan.to_vec(), + table_names, + ctx.clone(), + )?; self.create_view_by_expr(expr, ctx).await } diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 893b8b24c372..8a7f6fb8bb31 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -9,30 +9,22 @@ workspace = true [dependencies] ahash.workspace = true -async-recursion = "1.0" async-trait.workspace = true bytemuck.workspace = true -catalog.workspace = true -common-catalog.workspace = true common-error.workspace = true common-macro.workspace = true -common-query.workspace = true common-recordbatch.workspace = true common-telemetry.workspace = true datafusion.workspace = true datafusion-expr.workspace = true -datafusion-functions.workspace = true datatypes.workspace = true futures = "0.3" greptime-proto.workspace = true -itertools.workspace = true lazy_static.workspace = true prometheus.workspace = true promql-parser = "0.1.1" prost.workspace = true -session.workspace = true snafu.workspace = true -table.workspace = true [dev-dependencies] query.workspace = true diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 4cc4c47a2e75..be769ace3d43 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -18,34 +18,12 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion::error::DataFusionError; -use promql_parser::parser::{Expr as PromExpr, TokenType, VectorMatchCardinality}; use snafu::{Location, Snafu}; #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("Unsupported expr type: {}", name))] - UnsupportedExpr { - name: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Unsupported vector matches: {:?}", name))] - UnsupportedVectorMatch { - name: VectorMatchCardinality, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Unexpected token: {:?}", token))] - UnexpectedToken { - token: TokenType, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Internal error during building DataFusion plan"))] DataFusionPlanning { #[snafu(source)] @@ -54,49 +32,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Unexpected plan or expression: {}", desc))] - UnexpectedPlanExpr { - desc: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Unknown table type, downcast failed"))] - UnknownTable { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Cannot find time index column in table {}", table))] - TimeIndexNotFound { - table: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Cannot find value columns in table {}", table))] - ValueNotFound { - table: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display( - "Cannot accept multiple vector as function input, PromQL expr: {:?}", - expr, - ))] - MultipleVector { - expr: PromExpr, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Expect a PromQL expr but not found, input expr: {:?}", expr))] - ExpectExpr { - expr: PromExpr, - #[snafu(implicit)] - location: Location, - }, #[snafu(display( "Illegal range: offset {}, length {}, array len {}", offset, @@ -125,117 +60,24 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Table (metric) name not found, this indicates a procedure error in PromQL planner" - ))] - TableNameNotFound { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("General catalog error: "))] - Catalog { - #[snafu(implicit)] - location: Location, - source: catalog::error::Error, - }, - - #[snafu(display("Expect a range selector, but not found"))] - ExpectRangeSelector { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Zero range in range selector"))] - ZeroRangeSelector { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Cannot find column {col}"))] ColumnNotFound { col: String, #[snafu(implicit)] location: Location, }, - - #[snafu(display("Found multiple metric matchers in selector"))] - MultipleMetricMatchers { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Expect a metric matcher, but not found"))] - NoMetricMatcher { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Invalid function argument for {}", fn_name))] - FunctionInvalidArgument { - fn_name: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display( - "Attempt to combine two tables with different column sets, left: {:?}, right: {:?}", - left, - right - ))] - CombineTableColumnMismatch { - left: Vec, - right: Vec, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Multi fields calculation is not supported in {}", operator))] - MultiFieldsNotSupported { - operator: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Matcher operator {matcher_op} is not supported for {matcher}"))] - UnsupportedMatcherOp { - matcher_op: String, - matcher: String, - #[snafu(implicit)] - location: Location, - }, } impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - TimeIndexNotFound { .. } - | ValueNotFound { .. } - | UnsupportedExpr { .. } - | UnexpectedToken { .. } - | MultipleVector { .. } - | ExpectExpr { .. } - | ExpectRangeSelector { .. } - | ZeroRangeSelector { .. } - | ColumnNotFound { .. } - | Deserialize { .. } - | FunctionInvalidArgument { .. } - | UnsupportedVectorMatch { .. } - | CombineTableColumnMismatch { .. } - | DataFusionPlanning { .. } - | MultiFieldsNotSupported { .. } - | UnexpectedPlanExpr { .. } - | UnsupportedMatcherOp { .. } - | IllegalRange { .. } => StatusCode::InvalidArguments, - - UnknownTable { .. } | EmptyRange { .. } => StatusCode::Internal, - - TableNameNotFound { .. } => StatusCode::TableNotFound, - - MultipleMetricMatchers { .. } | NoMetricMatcher { .. } => StatusCode::InvalidSyntax, + Deserialize { .. } => StatusCode::Unexpected, + IllegalRange { .. } | ColumnNotFound { .. } | EmptyRange { .. } => { + StatusCode::InvalidArguments + } - Catalog { source, .. } => source.status_code(), + DataFusionPlanning { .. } => StatusCode::PlanQuery, } } diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index f8e32fc4dcdf..eba327c1bf64 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -35,4 +35,4 @@ pub use scalar_calculate::ScalarCalculate; pub use series_divide::{SeriesDivide, SeriesDivideExec, SeriesDivideStream}; pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctOnStream}; -pub(crate) type Millisecond = ::Native; +pub type Millisecond = ::Native; diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 127bf45d5f1a..a29fc032e957 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -20,5 +20,4 @@ pub mod error; pub mod extension_plan; pub mod functions; mod metrics; -pub mod planner; pub mod range_array; diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 78e435861470..74307ab7180b 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -16,6 +16,7 @@ arrow-schema.workspace = true async-recursion = "1.0" async-stream.workspace = true async-trait = "0.1" +bytes.workspace = true catalog.workspace = true chrono.workspace = true common-base.workspace = true @@ -28,11 +29,13 @@ common-meta.workspace = true common-plugins.workspace = true common-query.workspace = true common-recordbatch.workspace = true +common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true +datafusion-functions.workspace = true datafusion-optimizer.workspace = true datafusion-physical-expr.workspace = true datafusion-sql.workspace = true @@ -41,6 +44,7 @@ futures = "0.3" futures-util.workspace = true greptime-proto.workspace = true humantime.workspace = true +itertools.workspace = true lazy_static.workspace = true meter-core.workspace = true meter-macros.workspace = true @@ -49,6 +53,7 @@ once_cell.workspace = true prometheus.workspace = true promql.workspace = true promql-parser = "0.1.1" +prost.workspace = true regex.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 1ff9770a56c9..2e3e1fde2ce5 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -35,7 +35,7 @@ use session::context::QueryContextRef; use snafu::ResultExt; use crate::error::{CatalogSnafu, DataFusionSnafu, Result}; -use crate::query_engine::QueryEngineState; +use crate::query_engine::{DefaultPlanDecoder, QueryEngineState}; pub struct DfContextProviderAdapter { engine_state: Arc, @@ -64,6 +64,7 @@ impl DfContextProviderAdapter { engine_state.catalog_manager().clone(), engine_state.disallow_cross_catalog_query(), query_ctx.as_ref(), + Arc::new(DefaultPlanDecoder::new(session_state.clone())), ); let tables = resolve_tables(table_names, &mut table_provider).await?; diff --git a/src/query/src/dist_plan.rs b/src/query/src/dist_plan.rs index 6ab93d4e1dba..1f543e5b71d4 100644 --- a/src/query/src/dist_plan.rs +++ b/src/query/src/dist_plan.rs @@ -18,5 +18,5 @@ mod merge_scan; mod planner; pub use analyzer::DistPlannerAnalyzer; -pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan}; +pub use merge_scan::{EncodedMergeScan, MergeScanExec, MergeScanLogicalPlan}; pub use planner::DistExtensionPlanner; diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 870b92633981..bbb3e5ddd9cc 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -31,6 +31,7 @@ use crate::dist_plan::commutativity::{ partial_commutative_transformer, Categorizer, Commutativity, }; use crate::dist_plan::merge_scan::MergeScanLogicalPlan; +use crate::query_engine::DefaultSerializer; pub struct DistPlannerAnalyzer; @@ -150,7 +151,10 @@ impl PlanRewriter { /// Return true if should stop and expand. The input plan is the parent node of current node fn should_expand(&mut self, plan: &LogicalPlan) -> bool { - if DFLogicalSubstraitConvertor.encode(plan).is_err() { + if DFLogicalSubstraitConvertor + .encode(plan, DefaultSerializer) + .is_err() + { return true; } diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 36f106b87319..b52a5ec98f05 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -119,6 +119,60 @@ impl MergeScanLogicalPlan { } } +/// The encoded `MergeScanLogicalPlan`, +/// used as a temporary container for decoding +#[derive(Debug, Hash, PartialEq, Eq, Clone)] +pub struct EncodedMergeScan { + pub input: Vec, + pub is_placeholder: bool, +} + +impl UserDefinedLogicalNodeCore for EncodedMergeScan { + fn name(&self) -> &str { + Self::name() + } + + // Prevent further optimization. + // The input can be retrieved by `self.input()` + fn inputs(&self) -> Vec<&LogicalPlan> { + unreachable!(); + } + + fn schema(&self) -> &datafusion_common::DFSchemaRef { + unreachable!(); + } + + // Prevent further optimization + fn expressions(&self) -> Vec { + unreachable!(); + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "EncodedMergeScan [is_placeholder={}]", + self.is_placeholder + ) + } + + fn from_template(&self, _exprs: &[datafusion_expr::Expr], _inputs: &[LogicalPlan]) -> Self { + self.clone() + } +} + +impl EncodedMergeScan { + pub fn new(input: Vec, is_placeholder: bool) -> Self { + Self { + input, + is_placeholder, + } + } + + pub fn name() -> &'static str { + "EncodedMergeScan" + } +} + pub struct MergeScanExec { table: TableName, regions: Vec, diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 1d29fe7aba29..da48cb026844 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -39,6 +39,7 @@ use table::table::adapter::DfTableProviderAdapter; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; use crate::error; use crate::error::{CatalogSnafu, TableNotFoundSnafu}; +use crate::query_engine::DefaultSerializer; use crate::region_query::RegionQueryHandlerRef; pub struct DistExtensionPlanner { @@ -101,7 +102,7 @@ impl ExtensionPlanner for DistExtensionPlanner { // Pass down the original plan, allow execution nodes to do their optimization let amended_plan = Self::plan_with_full_table_name(input_plan.clone(), &table_name)?; let substrait_plan = DFLogicalSubstraitConvertor - .encode(&amended_plan) + .encode(&amended_plan, DefaultSerializer) .context(error::EncodeSubstraitLogicalPlanSnafu)? .into(); diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 71ec0d4ac7e0..6fd262f9f271 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -18,9 +18,8 @@ use std::any::Any; use std::sync::{Arc, Mutex}; use async_trait::async_trait; +pub use catalog::dummy_catalog::DummyCatalogList; use common_recordbatch::OrderOption; -use datafusion::catalog::schema::SchemaProvider; -use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::physical_plan::ExecutionPlan; @@ -35,97 +34,6 @@ use table::table::scan::StreamScanAdapter; use crate::error::{GetRegionMetadataSnafu, Result}; -/// Resolve to the given region (specified by [RegionId]) unconditionally. -#[derive(Clone)] -pub struct DummyCatalogList { - catalog: DummyCatalogProvider, -} - -impl DummyCatalogList { - /// Creates a new catalog list with the given table provider. - pub fn with_table_provider(table_provider: Arc) -> Self { - let schema_provider = DummySchemaProvider { - table: table_provider, - }; - let catalog_provider = DummyCatalogProvider { - schema: schema_provider, - }; - Self { - catalog: catalog_provider, - } - } -} - -impl CatalogProviderList for DummyCatalogList { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( - &self, - _name: String, - _catalog: Arc, - ) -> Option> { - None - } - - fn catalog_names(&self) -> Vec { - vec![] - } - - fn catalog(&self, _name: &str) -> Option> { - Some(Arc::new(self.catalog.clone())) - } -} - -/// A dummy catalog provider for [DummyCatalogList]. -#[derive(Clone)] -struct DummyCatalogProvider { - schema: DummySchemaProvider, -} - -impl CatalogProvider for DummyCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - vec![] - } - - fn schema(&self, _name: &str) -> Option> { - Some(Arc::new(self.schema.clone())) - } -} - -/// A dummy schema provider for [DummyCatalogList]. -#[derive(Clone)] -struct DummySchemaProvider { - table: Arc, -} - -#[async_trait] -impl SchemaProvider for DummySchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - vec![] - } - - async fn table( - &self, - _name: &str, - ) -> datafusion::error::Result>> { - Ok(Some(self.table.clone())) - } - - fn table_exist(&self, _name: &str) -> bool { - true - } -} - /// For [TableProvider] and [DummyCatalogList] #[derive(Clone)] pub struct DummyTableProvider { @@ -207,8 +115,6 @@ impl DummyTableProvider { } } -pub struct DummyTableProviderFactory; - #[async_trait] impl TableProviderFactory for DummyTableProviderFactory { async fn create( @@ -233,6 +139,8 @@ impl TableProviderFactory for DummyTableProviderFactory { } } +pub struct DummyTableProviderFactory; + #[async_trait] pub trait TableProviderFactory: Send + Sync { async fn create( diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 9b6413e4ed92..4ac5a7c10aa9 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -14,6 +14,7 @@ #![feature(let_chains)] #![feature(int_roundings)] +#![feature(option_get_or_insert_default)] mod analyze; pub mod dataframe; @@ -31,6 +32,7 @@ pub mod physical_planner; pub mod physical_wrapper; pub mod plan; pub mod planner; +pub mod promql; pub mod query_engine; mod range_select; pub mod region_query; diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index 34495dee989a..47af0a4d670a 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -12,15 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_meta::table_name::TableName; use common_query::prelude::ScalarValue; -use datafusion_common::ParamValues; +use datafusion::datasource::DefaultTableSource; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::{ParamValues, TableReference}; use datafusion_expr::LogicalPlan as DfLogicalPlan; use datatypes::data_type::ConcreteDataType; use datatypes::schema::Schema; use snafu::ResultExt; +pub use table::metadata::TableType; +use table::table::adapter::DfTableProviderAdapter; use crate::error::{ConvertDatafusionSchemaSnafu, DataFusionSnafu, Result}; @@ -94,6 +100,13 @@ impl LogicalPlan { LogicalPlan::DfPlan(plan) => plan, } } + + /// Returns the DataFusion logical plan reference + pub fn df_plan(&self) -> &DfLogicalPlan { + match self { + LogicalPlan::DfPlan(plan) => &plan, + } + } } impl From for LogicalPlan { @@ -101,3 +114,74 @@ impl From for LogicalPlan { Self::DfPlan(plan) } } + +/// Visitor to extract table names from logical plan (TableScan node) +#[derive(Default)] +pub struct TableNamesExtractor { + pub table_names: HashSet, +} + +impl TreeNodeVisitor for TableNamesExtractor { + type Node = DfLogicalPlan; + + fn f_down(&mut self, node: &Self::Node) -> datafusion::error::Result { + match node { + DfLogicalPlan::TableScan(scan) => { + if let Some(source) = scan.source.as_any().downcast_ref::() { + if let Some(provider) = source + .table_provider + .as_any() + .downcast_ref::() + { + if provider.table().table_type() == TableType::Base { + let info = provider.table().table_info(); + self.table_names.insert(TableName::new( + info.catalog_name.clone(), + info.schema_name.clone(), + info.name.clone(), + )); + } + } + } + match &scan.table_name { + TableReference::Full { + catalog, + schema, + table, + } => { + self.table_names.insert(TableName::new( + catalog.to_string(), + schema.to_string(), + table.to_string(), + )); + } + // TODO(ruihang): Maybe the following two cases should not be valid + TableReference::Partial { schema, table } => { + self.table_names.insert(TableName::new( + DEFAULT_CATALOG_NAME.to_string(), + schema.to_string(), + table.to_string(), + )); + } + TableReference::Bare { table } => { + self.table_names.insert(TableName::new( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + table.to_string(), + )); + } + } + + Ok(TreeNodeRecursion::Continue) + } + _ => Ok(TreeNodeRecursion::Continue), + } + } +} + +/// Extract fully resolved table names from logical plan +pub fn extract_full_table_names(plan: &DfLogicalPlan) -> Result> { + let mut extractor = TableNamesExtractor::default(); + let _ = plan.visit(&mut extractor).context(DataFusionSnafu)?; + Ok(extractor.table_names) +} diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 5f350a638d7c..0d399c45c31e 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -24,7 +24,6 @@ use datafusion::execution::context::SessionState; use datafusion::sql::planner::PlannerContext; use datafusion_expr::Expr as DfExpr; use datafusion_sql::planner::{ParserOptions, SqlToRel}; -use promql::planner::PromPlanner; use promql_parser::parser::EvalStmt; use session::context::QueryContextRef; use snafu::ResultExt; @@ -34,7 +33,8 @@ use sql::statements::statement::Statement; use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu}; use crate::parser::QueryStatement; use crate::plan::LogicalPlan; -use crate::query_engine::QueryEngineState; +use crate::promql::planner::PromPlanner; +use crate::query_engine::{DefaultPlanDecoder, QueryEngineState}; use crate::range_select::plan_rewrite::RangePlanRewriter; use crate::{DfContextProviderAdapter, QueryEngineContext}; @@ -69,6 +69,7 @@ impl DfLogicalPlanner { self.engine_state.catalog_manager().clone(), self.engine_state.disallow_cross_catalog_query(), query_ctx.as_ref(), + Arc::new(DefaultPlanDecoder::new(self.session_state.clone())), ); let context_provider = DfContextProviderAdapter::try_new( @@ -140,6 +141,7 @@ impl DfLogicalPlanner { self.engine_state.catalog_manager().clone(), self.engine_state.disallow_cross_catalog_query(), query_ctx.as_ref(), + Arc::new(DefaultPlanDecoder::new(self.session_state.clone())), ); PromPlanner::stmt_to_plan(table_provider, stmt) .await diff --git a/src/query/src/promql.rs b/src/query/src/promql.rs new file mode 100644 index 000000000000..06d2bbd21ae0 --- /dev/null +++ b/src/query/src/promql.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod error; +pub mod planner; diff --git a/src/query/src/promql/error.rs b/src/query/src/promql/error.rs new file mode 100644 index 000000000000..1705515ea51c --- /dev/null +++ b/src/query/src/promql/error.rs @@ -0,0 +1,228 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use datafusion::error::DataFusionError; +use promql::error::Error as PromqlError; +use promql_parser::parser::{Expr as PromExpr, TokenType, VectorMatchCardinality}; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Unsupported expr type: {}", name))] + UnsupportedExpr { + name: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unsupported vector matches: {:?}", name))] + UnsupportedVectorMatch { + name: VectorMatchCardinality, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unexpected token: {:?}", token))] + UnexpectedToken { + token: TokenType, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Internal error during building DataFusion plan"))] + DataFusionPlanning { + #[snafu(source)] + error: datafusion::error::DataFusionError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unexpected plan or expression: {}", desc))] + UnexpectedPlanExpr { + desc: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unknown table type, downcast failed"))] + UnknownTable { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Cannot find time index column in table {}", table))] + TimeIndexNotFound { + table: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Cannot find value columns in table {}", table))] + ValueNotFound { + table: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to create PromQL plan node"))] + PromqlPlanNode { + #[snafu(source)] + source: PromqlError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Cannot accept multiple vector as function input, PromQL expr: {:?}", + expr, + ))] + MultipleVector { + expr: PromExpr, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Table (metric) name not found, this indicates a procedure error in PromQL planner" + ))] + TableNameNotFound { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("General catalog error: "))] + Catalog { + #[snafu(implicit)] + location: Location, + source: catalog::error::Error, + }, + + #[snafu(display("Expect a range selector, but not found"))] + ExpectRangeSelector { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Zero range in range selector"))] + ZeroRangeSelector { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Cannot find column {col}"))] + ColumnNotFound { + col: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Found multiple metric matchers in selector"))] + MultipleMetricMatchers { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Expect a metric matcher, but not found"))] + NoMetricMatcher { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid function argument for {}", fn_name))] + FunctionInvalidArgument { + fn_name: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Attempt to combine two tables with different column sets, left: {:?}, right: {:?}", + left, + right + ))] + CombineTableColumnMismatch { + left: Vec, + right: Vec, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Multi fields calculation is not supported in {}", operator))] + MultiFieldsNotSupported { + operator: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Matcher operator {matcher_op} is not supported for {matcher}"))] + UnsupportedMatcherOp { + matcher_op: String, + matcher: String, + #[snafu(implicit)] + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + TimeIndexNotFound { .. } + | ValueNotFound { .. } + | UnsupportedExpr { .. } + | UnexpectedToken { .. } + | MultipleVector { .. } + | ExpectRangeSelector { .. } + | ZeroRangeSelector { .. } + | ColumnNotFound { .. } + | FunctionInvalidArgument { .. } + | UnsupportedVectorMatch { .. } + | CombineTableColumnMismatch { .. } + | UnexpectedPlanExpr { .. } + | UnsupportedMatcherOp { .. } => StatusCode::InvalidArguments, + + UnknownTable { .. } => StatusCode::Internal, + + PromqlPlanNode { source, .. } => source.status_code(), + + DataFusionPlanning { .. } => StatusCode::PlanQuery, + + TableNameNotFound { .. } => StatusCode::TableNotFound, + + MultipleMetricMatchers { .. } | NoMetricMatcher { .. } => StatusCode::InvalidSyntax, + + MultiFieldsNotSupported { .. } => StatusCode::Unsupported, + Catalog { source, .. } => source.status_code(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; + +impl From for DataFusionError { + fn from(err: Error) -> Self { + DataFusionError::External(Box::new(err)) + } +} diff --git a/src/promql/src/planner.rs b/src/query/src/promql/planner.rs similarity index 99% rename from src/promql/src/planner.rs rename to src/query/src/promql/planner.rs index 0af53088388a..c95a81092e5a 100644 --- a/src/promql/src/planner.rs +++ b/src/query/src/promql/planner.rs @@ -37,6 +37,15 @@ use datafusion::sql::TableReference; use datafusion_expr::utils::conjunction; use datatypes::arrow::datatypes::DataType as ArrowDataType; use itertools::Itertools; +use promql::extension_plan::{ + build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, + RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, +}; +use promql::functions::{ + AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, + Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime, + QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime, +}; use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME}; use promql_parser::parser::{ token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, @@ -47,23 +56,14 @@ use promql_parser::parser::{ use snafu::{ensure, OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; -use crate::error::{ +use crate::promql::error::{ CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu, ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultiFieldsNotSupportedSnafu, - MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, Result, - TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, - UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, + MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu, + Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, + UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu, }; -use crate::extension_plan::{ - build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, - RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, -}; -use crate::functions::{ - AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, - Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime, - QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime, -}; /// `time()` function in PromQL. const SPECIAL_TIME_FUNCTION: &str = "time"; @@ -1469,16 +1469,19 @@ impl PromPlanner { }, ); let scalar_plan = LogicalPlan::Extension(Extension { - node: Arc::new(ScalarCalculate::new( - self.ctx.start, - self.ctx.end, - self.ctx.interval, - input, - self.ctx.time_index_column.as_ref().unwrap(), - &self.ctx.tag_columns, - &self.ctx.field_columns[0], - self.ctx.table_name.as_deref(), - )?), + node: Arc::new( + ScalarCalculate::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + input, + self.ctx.time_index_column.as_ref().unwrap(), + &self.ctx.tag_columns, + &self.ctx.field_columns[0], + self.ctx.table_name.as_deref(), + ) + .context(PromqlPlanNodeSnafu)?, + ), }); // scalar plan have no tag columns self.ctx.tag_columns.clear(); diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 18923f3b96ad..1beea2a1c2d2 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -13,9 +13,9 @@ // limitations under the License. mod context; +mod default_serializer; pub mod options; mod state; - use std::any::Any; use std::sync::Arc; @@ -29,6 +29,7 @@ use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use common_query::Output; use datatypes::schema::Schema; +pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer}; use session::context::QueryContextRef; use table::TableRef; diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index f76332cde2b4..2073dc933746 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -14,10 +14,13 @@ use std::sync::Arc; +use common_query::logical_plan::SubstraitPlanDecoderRef; use common_telemetry::tracing_context::TracingContext; use datafusion::execution::context::{SessionState, TaskContext}; use session::context::QueryContextRef; +use crate::query_engine::default_serializer::DefaultPlanDecoder; + #[derive(Debug)] pub struct QueryEngineContext { state: SessionState, @@ -58,6 +61,11 @@ impl QueryEngineContext { )) } + /// Creates a `[LogicalPlan]` decoder + pub fn new_plan_decoder(&self) -> SubstraitPlanDecoderRef { + Arc::new(DefaultPlanDecoder::new(self.state.clone())) + } + /// Mock an engine context for unit tests. #[cfg(any(test, feature = "test"))] pub fn mock() -> Self { diff --git a/src/query/src/query_engine/default_serializer.rs b/src/query/src/query_engine/default_serializer.rs new file mode 100644 index 000000000000..13a626372923 --- /dev/null +++ b/src/query/src/query_engine/default_serializer.rs @@ -0,0 +1,172 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_query::logical_plan::SubstraitPlanDecoder; +use datafusion::catalog::CatalogProviderList; +use datafusion::common::DataFusionError; +use datafusion::error::Result; +use datafusion::execution::context::SessionState; +use datafusion::execution::registry::SerializerRegistry; +use datafusion::logical_expr::{Extension, LogicalPlan}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion_expr::UserDefinedLogicalNode; +use greptime_proto::substrait_extension::MergeScan as PbMergeScan; +use prost::Message; +use snafu::ResultExt; +use substrait::extension_serializer::ExtensionSerializer; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; + +use crate::dist_plan::{EncodedMergeScan, MergeScanLogicalPlan}; +use crate::error::DataFusionSnafu; + +/// Extended `[substrait::extension_serializer::ExtensionSerializer]` but supports `[MergeScanLogicalPlan]` serialization. +pub struct DefaultSerializer; + +impl SerializerRegistry for DefaultSerializer { + fn serialize_logical_plan(&self, node: &dyn UserDefinedLogicalNode) -> Result> { + if node.name() == MergeScanLogicalPlan::name() { + let merge_scan = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to MergeScanLogicalPlan"); + + let input = merge_scan.input(); + let is_placeholder = merge_scan.is_placeholder(); + let input = DFLogicalSubstraitConvertor + .encode(input, DefaultSerializer) + .map_err(|e| DataFusionError::External(Box::new(e)))? + .to_vec(); + + Ok(PbMergeScan { + is_placeholder, + input, + } + .encode_to_vec()) + } else { + ExtensionSerializer.serialize_logical_plan(node) + } + } + + fn deserialize_logical_plan( + &self, + name: &str, + bytes: &[u8], + ) -> Result> { + if name == MergeScanLogicalPlan::name() { + let pb_merge_scan = + PbMergeScan::decode(bytes).map_err(|e| DataFusionError::External(Box::new(e)))?; + + let input = pb_merge_scan.input; + let is_placeholder = pb_merge_scan.is_placeholder; + + // Use `EncodedMergeScan` as a temporary container, + // it will be rewritten into `MergeScanLogicalPlan` by `SubstraitPlanDecoder`. + // We can't decode the logical plan here because we don't have + // the `SessionState` and `CatalogProviderList`. + Ok(Arc::new(EncodedMergeScan::new(input, is_placeholder))) + } else { + ExtensionSerializer.deserialize_logical_plan(name, bytes) + } + } +} + +/// The datafusion `[LogicalPlan]` decoder. +pub struct DefaultPlanDecoder { + session_state: SessionState, +} + +impl DefaultPlanDecoder { + pub fn new(session_state: SessionState) -> Self { + Self { session_state } + } + + /// Rewrites `[EncodedMergeScan]` to `[MergeScanLogicalPlan]`. + fn rewrite_merge_scan( + &self, + plan: LogicalPlan, + catalog_list: Arc, + ) -> crate::error::Result { + let mut rewriter = MergeScanRewriter { + session_state: self.session_state.clone(), + catalog_list, + }; + Ok(plan.rewrite(&mut rewriter).context(DataFusionSnafu)?.data) + } +} + +#[async_trait::async_trait] +impl SubstraitPlanDecoder for DefaultPlanDecoder { + async fn decode( + &self, + message: bytes::Bytes, + catalog_list: Arc, + ) -> common_query::error::Result { + // The session_state already has the `DefaultSerialzier` as `SerializerRegistry`. + let logical_plan = DFLogicalSubstraitConvertor + .decode(message, catalog_list.clone(), self.session_state.clone()) + .await + .unwrap(); + + Ok(self.rewrite_merge_scan(logical_plan, catalog_list).unwrap()) + } +} + +struct MergeScanRewriter { + catalog_list: Arc, + session_state: SessionState, +} + +impl TreeNodeRewriter for MergeScanRewriter { + type Node = LogicalPlan; + + /// descend + fn f_down<'a>(&mut self, node: Self::Node) -> Result> { + match node { + LogicalPlan::Extension(Extension { node }) => { + if node.name() == EncodedMergeScan::name() { + let encoded_merge_scan = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to EncodedMergeScan"); + let catalog_list = self.catalog_list.clone(); + let session_state = self.session_state.clone(); + let input = encoded_merge_scan.input.clone(); + + let input = std::thread::spawn(move || { + common_runtime::block_on_bg(async move { + DFLogicalSubstraitConvertor + .decode(&input[..], catalog_list, session_state) + .await + .map_err(|e| DataFusionError::External(Box::new(e))) + }) + }) + .join() + .unwrap()?; + + Ok(Transformed::yes(LogicalPlan::Extension(Extension { + node: Arc::new(MergeScanLogicalPlan::new( + input, + encoded_merge_scan.is_placeholder, + )), + }))) + } else { + Ok(Transformed::no(LogicalPlan::Extension(Extension { node }))) + } + } + node => Ok(Transformed::no(node)), + } + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 9fdee8fc0e36..51b3f82ef228 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -37,7 +37,6 @@ use datafusion_optimizer::analyzer::count_wildcard_rule::CountWildcardRule; use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule}; use datafusion_optimizer::optimizer::Optimizer; use promql::extension_plan::PromExtensionPlanner; -use substrait::extension_serializer::ExtensionSerializer; use table::table::adapter::DfTableProviderAdapter; use table::TableRef; @@ -49,6 +48,7 @@ use crate::optimizer::string_normalization::StringNormalizationRule; use crate::optimizer::type_conversion::TypeConversionRule; use crate::optimizer::ExtensionAnalyzerRule; use crate::query_engine::options::QueryOptions; +use crate::query_engine::DefaultSerializer; use crate::range_select::planner::RangeSelectPlanner; use crate::region_query::RegionQueryHandlerRef; use crate::QueryEngineContext; @@ -115,8 +115,8 @@ impl QueryEngineState { physical_optimizer.rules.push(Arc::new(RemoveDuplicate)); let session_state = SessionState::new_with_config_rt(session_config, runtime_env) - .with_serializer_registry(Arc::new(ExtensionSerializer)) .with_analyzer_rules(analyzer.rules) + .with_serializer_registry(Arc::new(DefaultSerializer)) .with_query_planner(Arc::new(DfQueryPlanner::new( catalog_list.clone(), region_query_handler, diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index b285be08b907..48e5f2bf9cd3 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -11,6 +11,7 @@ testing = [] workspace = true [dependencies] +api.workspace = true async-trait = "0.1" chrono.workspace = true common-base.workspace = true diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index 857d529e8add..f4eb68cc85c7 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -21,6 +21,7 @@ pub mod predicate; pub mod requests; pub mod stats; pub mod table; +pub mod table_name; pub mod table_reference; pub mod test_util; diff --git a/src/common/meta/src/table_name.rs b/src/table/src/table_name.rs similarity index 98% rename from src/common/meta/src/table_name.rs rename to src/table/src/table_name.rs index 645e6386df02..f999e013f243 100644 --- a/src/common/meta/src/table_name.rs +++ b/src/table/src/table_name.rs @@ -16,7 +16,8 @@ use std::fmt::{Display, Formatter}; use api::v1::TableName as PbTableName; use serde::{Deserialize, Serialize}; -use table::table_reference::TableReference; + +use crate::table_reference::TableReference; #[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)] pub struct TableName { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 4131d36b30d0..892058794328 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -363,14 +363,11 @@ impl GreptimeDbClusterBuilder { .build(), ); - let table_cache = cache_registry.get().unwrap(); - let table_route_cache = cache_registry.get().unwrap(); let catalog_manager = KvBackendCatalogManager::new( Mode::Distributed, Some(meta_client.clone()), cached_meta_backend.clone(), - table_cache, - table_route_cache, + cache_registry.clone(), ) .await; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 4d99c9744fab..266dd69c1590 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -144,8 +144,7 @@ impl GreptimeDbStandaloneBuilder { Mode::Standalone, None, kv_backend.clone(), - cache_registry.get().unwrap(), - cache_registry.get().unwrap(), + cache_registry.clone(), ) .await;