Skip to content

Commit

Permalink
feat: querying from view works
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed May 15, 2024
1 parent 63a8d29 commit f09e726
Show file tree
Hide file tree
Showing 63 changed files with 1,150 additions and 539 deletions.
22 changes: 10 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11db14b8502f55ca5348917fd18e6fcf140f55e" }
greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "9aa33273cf5ab449e687e9aa2bb10af3067984f3" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
substrait.workspace = true
catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
Expand Down
15 changes: 14 additions & 1 deletion src/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use std::time::Duration;
use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, CacheRegistry, CacheRegistryBuilder, LayeredCacheRegistryBuilder,
new_table_route_cache, new_view_info_cache, CacheRegistry, CacheRegistryBuilder,
LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;
Expand All @@ -33,6 +34,7 @@ const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);

pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache";
pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache";
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
Expand Down Expand Up @@ -82,11 +84,22 @@ pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegist
cache,
kv_backend.clone(),
));
// Builds the view info cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let view_info_cache = Arc::new(new_view_info_cache(
VIEW_INFO_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));

CacheRegistryBuilder::default()
.add_cache(table_info_cache)
.add_cache(table_name_cache)
.add_cache(table_route_cache)
.add_cache(view_info_cache)
.add_cache(table_flownode_set_cache)
.build()
}
Expand Down
1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ arrow.workspace = true
arrow-schema.workspace = true
async-stream.workspace = true
async-trait = "0.1"
bytes.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
Expand Down
151 changes: 151 additions & 0 deletions src/catalog/src/dummy_catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Dummy catalog for region server.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;

/// Resolve to the given region (specified by [RegionId]) unconditionally.
#[derive(Clone)]
pub struct DummyCatalogList {
catalog: DummyCatalogProvider,
}

impl DummyCatalogList {
/// Creates a new catalog list with the given table provider.
pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
let schema_provider = Arc::new(DummySchemaProvider {
table: table_provider,
});
let catalog_provider = DummyCatalogProvider {
schema: schema_provider,
};
Self {
catalog: catalog_provider,
}
}

/// Creates a new catalog list with the given table providers.
pub fn with_tables(tables: HashMap<String, Arc<dyn TableProvider>>) -> Self {
let schema_provider = Arc::new(DummyTablesSchemaProvider { tables });
let catalog_provider = DummyCatalogProvider {
schema: schema_provider,
};
Self {
catalog: catalog_provider,
}
}
}

impl CatalogProviderList for DummyCatalogList {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
_name: String,
_catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
None
}

fn catalog_names(&self) -> Vec<String> {
vec![]
}

fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
Some(Arc::new(self.catalog.clone()))
}
}

/// A dummy catalog provider for [DummyCatalogList].
#[derive(Clone)]
struct DummyCatalogProvider {
schema: Arc<dyn SchemaProvider>,
}

impl CatalogProvider for DummyCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
vec![]
}

fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(self.schema.clone())
}
}

/// A dummy schema provider for [DummyCatalogList].
#[derive(Clone)]
struct DummySchemaProvider {
table: Arc<dyn TableProvider>,
}

#[async_trait]
impl SchemaProvider for DummySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
vec![]
}

async fn table(
&self,
_name: &str,
) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
Ok(Some(self.table.clone()))
}

fn table_exist(&self, _name: &str) -> bool {
true
}
}

/// A dummy schema provider for [DummyCatalogList].
#[derive(Clone)]
struct DummyTablesSchemaProvider {
tables: HashMap<String, Arc<dyn TableProvider>>,
}

#[async_trait]
impl SchemaProvider for DummyTablesSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.tables.keys().map(|t| t.to_string()).collect()
}

async fn table(&self, name: &str) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.get(name).cloned())
}

fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}
19 changes: 18 additions & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to get view info from cache"))]
GetViewCache {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Cache not found: {name}"))]
CacheNotFound {
name: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -337,6 +351,7 @@ impl ErrorExt for Error {
| Error::FindPartitions { .. }
| Error::FindRegionRoutes { .. }
| Error::InvalidEntryType { .. }
| Error::CacheNotFound { .. }
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,

Error::TableNotFound { .. } => StatusCode::TableNotFound,
Expand Down Expand Up @@ -385,7 +400,9 @@ impl ErrorExt for Error {
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::GetTableCache { .. } => StatusCode::Internal,
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
source.status_code()
}
}
}

Expand Down
Loading

0 comments on commit f09e726

Please sign in to comment.