Skip to content

Commit

Permalink
feat: make mito2 have ObjectStoreManager(initial) (#2643)
Browse files Browse the repository at this point in the history
* feat: make mito2 have object_store_manager(initial)

* chore: address review

* refactor: Arc<ObjectStoreManager> to ObjectStoreManagerRef and replace Vec with tuple

* fix: add ObjectStoreManager::from_default

* fix: remove cfg(test)

* fix: remove try_new from ObjectStoreManager
  • Loading branch information
NiwakaDev authored Oct 30, 2023
1 parent d0ff8ab commit 000e147
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 141 deletions.
22 changes: 17 additions & 5 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use futures_util::StreamExt;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::client::MetaClient;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::Mode;
Expand Down Expand Up @@ -354,7 +355,12 @@ impl DatanodeBuilder {
let mut region_server =
RegionServer::new(query_engine.clone(), runtime.clone(), event_listener);
let object_store = store::new_object_store(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store).await?;
let object_store_manager = ObjectStoreManager::new(
"default", // TODO: use a name which is set in the configuration when #919 is done.
object_store,
);
let engines =
Self::build_store_engines(opts, log_store, Arc::new(object_store_manager)).await?;
for engine in engines {
region_server.register_engine(engine);
}
Expand Down Expand Up @@ -392,7 +398,7 @@ impl DatanodeBuilder {
async fn build_store_engines<S>(
opts: &DatanodeOptions,
log_store: Arc<S>,
object_store: object_store::ObjectStore,
object_store_manager: ObjectStoreManagerRef,
) -> Result<Vec<RegionEngineRef>>
where
S: LogStore,
Expand All @@ -401,12 +407,18 @@ impl DatanodeBuilder {
for engine in &opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
let engine: MitoEngine =
MitoEngine::new(config.clone(), log_store.clone(), object_store.clone());
let engine: MitoEngine = MitoEngine::new(
config.clone(),
log_store.clone(),
object_store_manager.clone(),
);
engines.push(Arc::new(engine) as _);
}
RegionEngineConfig::File(config) => {
let engine = FileRegionEngine::new(config.clone(), object_store.clone());
let engine = FileRegionEngine::new(
config.clone(),
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
);
engines.push(Arc::new(engine) as _);
}
}
Expand Down
1 change: 0 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ impl ErrorExt for Error {
ColumnNotFound { .. } => StatusCode::TableColumnNotFound,

ParseSql { source, .. } => source.status_code(),

DeleteExprToRequest { source, .. } | InsertData { source, .. } => source.status_code(),

ColumnValuesNumberMismatch { .. }
Expand Down
14 changes: 7 additions & 7 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::timer;
use object_store::ObjectStore;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
Expand All @@ -72,12 +72,12 @@ impl MitoEngine {
pub fn new<S: LogStore>(
mut config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
) -> MitoEngine {
config.sanitize();

MitoEngine {
inner: Arc::new(EngineInner::new(config, log_store, object_store)),
inner: Arc::new(EngineInner::new(config, log_store, object_store_manager)),
}
}

Expand Down Expand Up @@ -108,10 +108,10 @@ impl EngineInner {
fn new<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
) -> EngineInner {
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store),
workers: WorkerGroup::start(config, log_store, object_store_manager),
}
}

Expand Down Expand Up @@ -235,7 +235,7 @@ impl MitoEngine {
pub fn new_for_test<S: LogStore>(
mut config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> MitoEngine {
Expand All @@ -246,7 +246,7 @@ impl MitoEngine {
workers: WorkerGroup::start_for_test(
config,
log_store,
object_store,
object_store_manager,
write_buffer_manager,
listener,
),
Expand Down
41 changes: 24 additions & 17 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
Expand Down Expand Up @@ -71,7 +72,7 @@ pub struct TestEnv {
/// Path to store data.
data_home: TempDir,
logstore: Option<Arc<RaftEngineLogStore>>,
object_store: Option<ObjectStore>,
object_store_manager: Option<ObjectStoreManagerRef>,
}

impl Default for TestEnv {
Expand All @@ -86,7 +87,7 @@ impl TestEnv {
TestEnv {
data_home: create_temp_dir(""),
logstore: None,
object_store: None,
object_store_manager: None,
}
}

Expand All @@ -95,7 +96,7 @@ impl TestEnv {
TestEnv {
data_home: create_temp_dir(prefix),
logstore: None,
object_store: None,
object_store_manager: None,
}
}

Expand All @@ -104,7 +105,7 @@ impl TestEnv {
TestEnv {
data_home,
logstore: None,
object_store: None,
object_store_manager: None,
}
}

Expand All @@ -113,17 +114,20 @@ impl TestEnv {
}

pub fn get_object_store(&self) -> Option<ObjectStore> {
self.object_store.clone()
self.object_store_manager
.as_ref()
.map(|manager| manager.default_object_store().clone())
}

/// Creates a new engine with specific config under this env.
pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine {
let (log_store, object_store) = self.create_log_and_object_store().await;
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;

let logstore = Arc::new(log_store);
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store = Some(object_store.clone());
MitoEngine::new(config, logstore, object_store)
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new(config, logstore, object_store_manager)
}

/// Creates a new engine with specific config and manager/listener under this env.
Expand All @@ -133,12 +137,13 @@ impl TestEnv {
manager: Option<WriteBufferManagerRef>,
listener: Option<EventListenerRef>,
) -> MitoEngine {
let (log_store, object_store) = self.create_log_and_object_store().await;
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;

let logstore = Arc::new(log_store);
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store = Some(object_store.clone());
MitoEngine::new_for_test(config, logstore, object_store, manager, listener)
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener)
}

/// Reopen the engine.
Expand All @@ -148,18 +153,20 @@ impl TestEnv {
MitoEngine::new(
config,
self.logstore.clone().unwrap(),
self.object_store.clone().unwrap(),
self.object_store_manager.clone().unwrap(),
)
}

/// Creates a new [WorkerGroup] with specific config under this env.
pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup {
let (log_store, object_store) = self.create_log_and_object_store().await;
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;

WorkerGroup::start(config, Arc::new(log_store), object_store)
WorkerGroup::start(config, Arc::new(log_store), Arc::new(object_store_manager))
}

async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) {
async fn create_log_and_object_store_manager(
&self,
) -> (RaftEngineLogStore, ObjectStoreManager) {
let data_home = self.data_home.path();
let wal_path = data_home.join("wal");
let data_path = data_home.join("data").as_path().display().to_string();
Expand All @@ -168,8 +175,8 @@ impl TestEnv {
let mut builder = Fs::default();
builder.root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();

(log_store, object_store)
let object_store_manager = ObjectStoreManager::new("default", object_store);
(log_store, object_store_manager)
}

/// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata`
Expand Down
18 changes: 9 additions & 9 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::time::Duration;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::ObjectStore;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl WorkerGroup {
pub(crate) fn start<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
) -> WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let config = Arc::new(config);
Expand All @@ -131,7 +131,7 @@ impl WorkerGroup {
id: id as WorkerId,
config: config.clone(),
log_store: log_store.clone(),
object_store: object_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
listener: WorkerListener::default(),
Expand Down Expand Up @@ -206,7 +206,7 @@ impl WorkerGroup {
pub(crate) fn start_for_test<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerGroup {
Expand All @@ -229,7 +229,7 @@ impl WorkerGroup {
id: id as WorkerId,
config: config.clone(),
log_store: log_store.clone(),
object_store: object_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
Expand All @@ -256,7 +256,7 @@ struct WorkerStarter<S> {
id: WorkerId,
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
listener: WorkerListener,
Expand All @@ -278,7 +278,7 @@ impl<S: LogStore> WorkerStarter<S> {
sender: sender.clone(),
receiver,
wal: Wal::new(self.log_store),
object_store: self.object_store,
object_store_manager: self.object_store_manager.clone(),
running: running.clone(),
memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
Expand Down Expand Up @@ -426,8 +426,8 @@ struct RegionWorkerLoop<S> {
receiver: Receiver<WorkerRequest>,
/// WAL of the engine.
wal: Wal<S>,
/// Object store for manifest and SSTs.
object_store: ObjectStore,
/// Manages object stores for manifest and SSTs.
object_store_manager: ObjectStoreManagerRef,
/// Whether the worker thread is still running.
running: Arc<AtomicBool>,
/// Memtable builder for each region.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id,
&request.region_dir,
self.memtable_builder.clone(),
self.object_store.clone(),
self.object_store_manager.default_object_store().clone(),
self.scheduler.clone(),
)
.metadata(metadata)
Expand Down
6 changes: 4 additions & 2 deletions src/mito2/src/worker/handle_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl<S> RegionWorkerLoop<S> {

// write dropping marker
let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE);
self.object_store
region
.access_layer
.object_store()
.write(&marker_path, vec![])
.await
.context(OpenDalSnafu)?;
Expand All @@ -68,7 +70,7 @@ impl<S> RegionWorkerLoop<S> {

// detach a background task to delete the region dir
let region_dir = region.access_layer.region_dir().to_owned();
let object_store = self.object_store.clone();
let object_store = region.access_layer.object_store().clone();
let dropping_regions = self.dropping_regions.clone();
let listener = self.listener.clone();
common_runtime::spawn_bg(async move {
Expand Down
11 changes: 8 additions & 3 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Check if this region is pending drop. And clean the entire dir if so.
if !self.dropping_regions.is_region_exists(region_id)
&& self
.object_store
.object_store_manager
.default_object_store()
.is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.await
.context(OpenDalSnafu)?
{
let result = remove_region_dir_once(&request.region_dir, &self.object_store).await;
let result = remove_region_dir_once(
&request.region_dir,
self.object_store_manager.default_object_store(),
)
.await;
info!("Region {} is dropped, result: {:?}", region_id, result);
return RegionNotFoundSnafu { region_id }.fail();
}
Expand All @@ -61,7 +66,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id,
&request.region_dir,
self.memtable_builder.clone(),
self.object_store.clone(),
self.object_store_manager.default_object_store().clone(),
self.scheduler.clone(),
)
.options(request.options)
Expand Down
Loading

0 comments on commit 000e147

Please sign in to comment.