diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 8a6b437de602..9dc157f3f943 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -33,6 +33,7 @@ use futures_util::StreamExt; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::client::MetaClient; use mito2::engine::MitoEngine; +use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; use query::QueryEngineFactory; use servers::Mode; @@ -354,7 +355,12 @@ impl DatanodeBuilder { let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone(), event_listener); let object_store = store::new_object_store(opts).await?; - let engines = Self::build_store_engines(opts, log_store, object_store).await?; + let object_store_manager = ObjectStoreManager::new( + "default", // TODO: use a name which is set in the configuration when #919 is done. + object_store, + ); + let engines = + Self::build_store_engines(opts, log_store, Arc::new(object_store_manager)).await?; for engine in engines { region_server.register_engine(engine); } @@ -392,7 +398,7 @@ impl DatanodeBuilder { async fn build_store_engines( opts: &DatanodeOptions, log_store: Arc, - object_store: object_store::ObjectStore, + object_store_manager: ObjectStoreManagerRef, ) -> Result> where S: LogStore, @@ -401,12 +407,18 @@ impl DatanodeBuilder { for engine in &opts.region_engine { match engine { RegionEngineConfig::Mito(config) => { - let engine: MitoEngine = - MitoEngine::new(config.clone(), log_store.clone(), object_store.clone()); + let engine: MitoEngine = MitoEngine::new( + config.clone(), + log_store.clone(), + object_store_manager.clone(), + ); engines.push(Arc::new(engine) as _); } RegionEngineConfig::File(config) => { - let engine = FileRegionEngine::new(config.clone(), object_store.clone()); + let engine = FileRegionEngine::new( + config.clone(), + object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine + ); engines.push(Arc::new(engine) as _); } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index ab315954aadf..1357570c3ac9 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -463,7 +463,6 @@ impl ErrorExt for Error { ColumnNotFound { .. } => StatusCode::TableColumnNotFound, ParseSql { source, .. } => source.status_code(), - DeleteExprToRequest { source, .. } | InsertData { source, .. } => source.status_code(), ColumnValuesNumberMismatch { .. } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 222e9919c61f..26cc83e55405 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -46,7 +46,7 @@ use common_error::ext::BoxedError; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::timer; -use object_store::ObjectStore; +use object_store::manager::ObjectStoreManagerRef; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; @@ -72,12 +72,12 @@ impl MitoEngine { pub fn new( mut config: MitoConfig, log_store: Arc, - object_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, ) -> MitoEngine { config.sanitize(); MitoEngine { - inner: Arc::new(EngineInner::new(config, log_store, object_store)), + inner: Arc::new(EngineInner::new(config, log_store, object_store_manager)), } } @@ -108,10 +108,10 @@ impl EngineInner { fn new( config: MitoConfig, log_store: Arc, - object_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, ) -> EngineInner { EngineInner { - workers: WorkerGroup::start(config, log_store, object_store), + workers: WorkerGroup::start(config, log_store, object_store_manager), } } @@ -235,7 +235,7 @@ impl MitoEngine { pub fn new_for_test( mut config: MitoConfig, log_store: Arc, - object_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, write_buffer_manager: Option, listener: Option, ) -> MitoEngine { @@ -246,7 +246,7 @@ impl MitoEngine { workers: WorkerGroup::start_for_test( config, log_store, - object_store, + object_store_manager, write_buffer_manager, listener, ), diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index d7cb13e5121b..b87fd12039c6 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -35,6 +35,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; +use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::services::Fs; use object_store::ObjectStore; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; @@ -71,7 +72,7 @@ pub struct TestEnv { /// Path to store data. data_home: TempDir, logstore: Option>, - object_store: Option, + object_store_manager: Option, } impl Default for TestEnv { @@ -86,7 +87,7 @@ impl TestEnv { TestEnv { data_home: create_temp_dir(""), logstore: None, - object_store: None, + object_store_manager: None, } } @@ -95,7 +96,7 @@ impl TestEnv { TestEnv { data_home: create_temp_dir(prefix), logstore: None, - object_store: None, + object_store_manager: None, } } @@ -104,7 +105,7 @@ impl TestEnv { TestEnv { data_home, logstore: None, - object_store: None, + object_store_manager: None, } } @@ -113,17 +114,20 @@ impl TestEnv { } pub fn get_object_store(&self) -> Option { - self.object_store.clone() + self.object_store_manager + .as_ref() + .map(|manager| manager.default_object_store().clone()) } /// Creates a new engine with specific config under this env. pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { - let (log_store, object_store) = self.create_log_and_object_store().await; + let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; let logstore = Arc::new(log_store); + let object_store_manager = Arc::new(object_store_manager); self.logstore = Some(logstore.clone()); - self.object_store = Some(object_store.clone()); - MitoEngine::new(config, logstore, object_store) + self.object_store_manager = Some(object_store_manager.clone()); + MitoEngine::new(config, logstore, object_store_manager) } /// Creates a new engine with specific config and manager/listener under this env. @@ -133,12 +137,13 @@ impl TestEnv { manager: Option, listener: Option, ) -> MitoEngine { - let (log_store, object_store) = self.create_log_and_object_store().await; + let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; let logstore = Arc::new(log_store); + let object_store_manager = Arc::new(object_store_manager); self.logstore = Some(logstore.clone()); - self.object_store = Some(object_store.clone()); - MitoEngine::new_for_test(config, logstore, object_store, manager, listener) + self.object_store_manager = Some(object_store_manager.clone()); + MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener) } /// Reopen the engine. @@ -148,18 +153,20 @@ impl TestEnv { MitoEngine::new( config, self.logstore.clone().unwrap(), - self.object_store.clone().unwrap(), + self.object_store_manager.clone().unwrap(), ) } /// Creates a new [WorkerGroup] with specific config under this env. pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { - let (log_store, object_store) = self.create_log_and_object_store().await; + let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; - WorkerGroup::start(config, Arc::new(log_store), object_store) + WorkerGroup::start(config, Arc::new(log_store), Arc::new(object_store_manager)) } - async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) { + async fn create_log_and_object_store_manager( + &self, + ) -> (RaftEngineLogStore, ObjectStoreManager) { let data_home = self.data_home.path(); let wal_path = data_home.join("wal"); let data_path = data_home.join("data").as_path().display().to_string(); @@ -168,8 +175,8 @@ impl TestEnv { let mut builder = Fs::default(); builder.root(&data_path); let object_store = ObjectStore::new(builder).unwrap().finish(); - - (log_store, object_store) + let object_store_manager = ObjectStoreManager::new("default", object_store); + (log_store, object_store_manager) } /// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata` diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 232e85010246..b6a906a41ebc 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -33,7 +33,7 @@ use std::time::Duration; use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; -use object_store::ObjectStore; +use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::storage::RegionId; @@ -112,7 +112,7 @@ impl WorkerGroup { pub(crate) fn start( config: MitoConfig, log_store: Arc, - object_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, ) -> WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); @@ -131,7 +131,7 @@ impl WorkerGroup { id: id as WorkerId, config: config.clone(), log_store: log_store.clone(), - object_store: object_store.clone(), + object_store_manager: object_store_manager.clone(), write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), listener: WorkerListener::default(), @@ -206,7 +206,7 @@ impl WorkerGroup { pub(crate) fn start_for_test( config: MitoConfig, log_store: Arc, - object_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, write_buffer_manager: Option, listener: Option, ) -> WorkerGroup { @@ -229,7 +229,7 @@ impl WorkerGroup { id: id as WorkerId, config: config.clone(), log_store: log_store.clone(), - object_store: object_store.clone(), + object_store_manager: object_store_manager.clone(), write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), listener: WorkerListener::new(listener.clone()), @@ -256,7 +256,7 @@ struct WorkerStarter { id: WorkerId, config: Arc, log_store: Arc, - object_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, write_buffer_manager: WriteBufferManagerRef, scheduler: SchedulerRef, listener: WorkerListener, @@ -278,7 +278,7 @@ impl WorkerStarter { sender: sender.clone(), receiver, wal: Wal::new(self.log_store), - object_store: self.object_store, + object_store_manager: self.object_store_manager.clone(), running: running.clone(), memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some( self.write_buffer_manager.clone(), @@ -426,8 +426,8 @@ struct RegionWorkerLoop { receiver: Receiver, /// WAL of the engine. wal: Wal, - /// Object store for manifest and SSTs. - object_store: ObjectStore, + /// Manages object stores for manifest and SSTs. + object_store_manager: ObjectStoreManagerRef, /// Whether the worker thread is still running. running: Arc, /// Memtable builder for each region. diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 34df7a1ad8d6..e983f300bc80 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -62,7 +62,7 @@ impl RegionWorkerLoop { region_id, &request.region_dir, self.memtable_builder.clone(), - self.object_store.clone(), + self.object_store_manager.default_object_store().clone(), self.scheduler.clone(), ) .metadata(metadata) diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index ce4d5cdd0a66..cc9542bfdd90 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -43,7 +43,9 @@ impl RegionWorkerLoop { // write dropping marker let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE); - self.object_store + region + .access_layer + .object_store() .write(&marker_path, vec![]) .await .context(OpenDalSnafu)?; @@ -68,7 +70,7 @@ impl RegionWorkerLoop { // detach a background task to delete the region dir let region_dir = region.access_layer.region_dir().to_owned(); - let object_store = self.object_store.clone(); + let object_store = region.access_layer.object_store().clone(); let dropping_regions = self.dropping_regions.clone(); let listener = self.listener.clone(); common_runtime::spawn_bg(async move { diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 18b226bacd7d..da8a1fc04b0e 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -44,12 +44,17 @@ impl RegionWorkerLoop { // Check if this region is pending drop. And clean the entire dir if so. if !self.dropping_regions.is_region_exists(region_id) && self - .object_store + .object_store_manager + .default_object_store() .is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE)) .await .context(OpenDalSnafu)? { - let result = remove_region_dir_once(&request.region_dir, &self.object_store).await; + let result = remove_region_dir_once( + &request.region_dir, + self.object_store_manager.default_object_store(), + ) + .await; info!("Region {} is dropped, result: {:?}", region_id, result); return RegionNotFoundSnafu { region_id }.fail(); } @@ -61,7 +66,7 @@ impl RegionWorkerLoop { region_id, &request.region_dir, self.memtable_builder.clone(), - self.object_store.clone(), + self.object_store_manager.default_object_store().clone(), self.scheduler.clone(), ) .options(request.options) diff --git a/src/object-store/src/error.rs b/src/object-store/src/error.rs deleted file mode 100644 index 8ea360d11683..000000000000 --- a/src/object-store/src/error.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; - -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; -use common_macro::stack_trace_debug; -use snafu::{Location, Snafu}; - -#[derive(Snafu)] -#[snafu(visibility(pub))] -#[stack_trace_debug] -pub enum Error { - #[snafu(display("Default storage not found: {}", default_object_store))] - DefaultStorageNotFound { - location: Location, - default_object_store: String, - }, -} - -pub type Result = std::result::Result; - -impl ErrorExt for Error { - fn status_code(&self) -> StatusCode { - match self { - Error::DefaultStorageNotFound { .. } => StatusCode::InvalidArguments, - } - } - - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 9623ef9a4ec6..5e738a00e5ad 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -19,7 +19,6 @@ pub use opendal::{ Operator as ObjectStore, Reader, Result, Writer, }; -pub mod error; pub mod layers; pub mod manager; mod metrics; diff --git a/src/object-store/src/manager.rs b/src/object-store/src/manager.rs index d7cb323057cf..138e56b1d5d1 100644 --- a/src/object-store/src/manager.rs +++ b/src/object-store/src/manager.rs @@ -13,12 +13,12 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; -use snafu::OptionExt; - -use crate::error::{DefaultStorageNotFoundSnafu, Result}; use crate::ObjectStore; +pub type ObjectStoreManagerRef = Arc; + /// Manages multiple object stores so that users can configure a storage for each table. /// This struct certainly have one default object store, and can have zero or more custom object stores. pub struct ObjectStoreManager { @@ -27,23 +27,20 @@ pub struct ObjectStoreManager { } impl ObjectStoreManager { - /// Creates a new manager with specific object stores. Returns an error if `stores` doesn't contain the default object store. - pub fn try_new( - stores: HashMap, - default_object_store: &str, - ) -> Result { - let default_object_store = stores - .get(default_object_store) - .context(DefaultStorageNotFoundSnafu { - default_object_store, - })? - .clone(); - Ok(ObjectStoreManager { - stores, - default_object_store, - }) + /// Creates a new manager from the object store used as a default one. + pub fn new(name: &str, object_store: ObjectStore) -> Self { + ObjectStoreManager { + stores: [(name.to_string(), object_store.clone())].into(), + default_object_store: object_store, + } + } + + /// Adds an object store to the manager. + pub fn add(&mut self, name: &str, object_store: ObjectStore) { + self.stores.insert(name.to_string(), object_store); } + /// Finds an object store corresponding to the name. pub fn find(&self, name: &str) -> Option<&ObjectStore> { self.stores.get(name) } @@ -55,12 +52,9 @@ impl ObjectStoreManager { #[cfg(test)] mod tests { - use std::collections::HashMap; - use common_test_util::temp_dir::{create_temp_dir, TempDir}; use super::ObjectStoreManager; - use crate::error::Error; use crate::services::Fs as Builder; use crate::ObjectStore; @@ -72,36 +66,18 @@ mod tests { } #[test] - fn test_new_returns_err_when_global_store_not_exist() { - let dir = create_temp_dir("new"); - let object_store = new_object_store(&dir); - let stores: HashMap = vec![ - ("File".to_string(), object_store.clone()), - ("S3".to_string(), object_store.clone()), - ] - .into_iter() - .collect(); + fn test_manager_behavior() { + let dir = create_temp_dir("default"); + let mut manager = ObjectStoreManager::new("default", new_object_store(&dir)); - assert!(matches!( - ObjectStoreManager::try_new(stores, "Gcs"), - Err(Error::DefaultStorageNotFound { .. }) - )); - } + assert!(manager.find("default").is_some()); + assert!(manager.find("Gcs").is_none()); - #[test] - fn test_new_returns_ok() { - let dir = create_temp_dir("new"); - let object_store = new_object_store(&dir); - let stores: HashMap = vec![ - ("File".to_string(), object_store.clone()), - ("S3".to_string(), object_store.clone()), - ] - .into_iter() - .collect(); - let object_store_manager = ObjectStoreManager::try_new(stores, "File").unwrap(); - assert_eq!(object_store_manager.stores.len(), 2); - assert!(object_store_manager.find("File").is_some()); - assert!(object_store_manager.find("S3").is_some()); - assert!(object_store_manager.find("Gcs").is_none()); + let dir = create_temp_dir("default"); + manager.add("Gcs", new_object_store(&dir)); + + // Should not overwrite the default object store with the new one. + assert!(manager.find("default").is_some()); + assert!(manager.find("Gcs").is_some()); } }