Skip to content

Commit

Permalink
remove bg processor reference
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN committed Oct 20, 2024
1 parent 84cf2eb commit b0e2dca
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import wal_entry;
import third_party;
import build_fast_rough_filter_task;
import catalog_delta_entry;
import infinity_context;

namespace infinity {

Expand All @@ -39,7 +40,7 @@ void UpdateSegmentBloomFilterTask::CreateAndSubmitTask(SegmentEntry *segment_ent
}
LOG_TRACE(fmt::format("UpdateSegmentBloomFilterTask: create task for segment: {}", segment_entry->segment_id()));
auto update_bloom_filter_task = MakeShared<UpdateSegmentBloomFilterTask>(segment_entry, table_entry, txn_mgr);
auto bg_processor = txn_mgr->bg_task_processor();
BGTaskProcessor *bg_processor = InfinityContext::instance().storage()->bg_processor();
bg_processor->Submit(std::move(update_bloom_filter_task));
}

Expand Down
93 changes: 92 additions & 1 deletion src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ void Storage::SetStorageMode(StorageMode target_mode) {
}
txn_mgr_ = MakeUnique<TxnManager>(new_catalog_.get(),
buffer_mgr_.get(),
bg_processor_.get(),
wal_mgr_.get(),
new_catalog_->next_txn_id(),
system_start_ts);
Expand Down Expand Up @@ -423,6 +422,98 @@ void Storage::SetStorageMode(StorageMode target_mode) {
current_storage_mode_ = target_mode;
}

Status Storage::SetReaderStorageContinue() {
// StorageMode current_mode = GetStorageMode();
// if (current_mode != StorageMode::kReadable) {
// UnrecoverableError(fmt::format("Expect current storage mode is READER, but it is {}", ToString(current_mode)));
// }
//
// if (system_start_ts == 0) {
// // Init database, need to create default_db
// LOG_INFO(fmt::format("Init a new catalog"));
// new_catalog_ = Catalog::NewCatalog();
// }
//
// BuiltinFunctions builtin_functions(new_catalog_);
// builtin_functions.Init();
// // Catalog finish init here.
// if (bg_processor_ != nullptr) {
// UnrecoverableError("Background processor was initialized before.");
// }
// bg_processor_ = MakeUnique<BGTaskProcessor>(wal_mgr_.get(), new_catalog_.get());
//
// // Construct txn manager
// if (txn_mgr_ != nullptr) {
// UnrecoverableError("Transaction manager was initialized before.");
// }
// txn_mgr_ = MakeUnique<TxnManager>(new_catalog_.get(),
// buffer_mgr_.get(),
// bg_processor_.get(),
// wal_mgr_.get(),
// new_catalog_->next_txn_id(),
// system_start_ts);
// txn_mgr_->Start();
//
// // start WalManager after TxnManager since it depends on TxnManager.
// wal_mgr_->Start();
//
// if (system_start_ts == 0 && target_mode == StorageMode::kWritable) {
// CreateDefaultDB();
// }
//
// if (memory_index_tracer_ != nullptr) {
// UnrecoverableError("Memory index tracer was initialized before.");
// }
// memory_index_tracer_ = MakeUnique<BGMemIndexTracer>(config_ptr_->MemIndexMemoryQuota(), new_catalog_.get(), txn_mgr_.get());
//
// new_catalog_->StartMemoryIndexCommit();
// new_catalog_->MemIndexRecover(buffer_mgr_.get(), system_start_ts);
//
// bg_processor_->Start();
//
// if (target_mode == StorageMode::kWritable) {
// if (compact_processor_ != nullptr) {
// UnrecoverableError("compact processor was initialized before.");
// }
//
// compact_processor_ = MakeUnique<CompactionProcessor>(new_catalog_.get(), txn_mgr_.get());
// compact_processor_->Start();
// }
//
// if (periodic_trigger_thread_ != nullptr) {
// UnrecoverableError("periodic trigger was initialized before.");
// }
// periodic_trigger_thread_ = MakeUnique<PeriodicTriggerThread>();
//
// if (target_mode == StorageMode::kWritable) {
// periodic_trigger_thread_->full_checkpoint_trigger_ =
// MakeShared<CheckpointPeriodicTrigger>(full_checkpoint_interval_sec, wal_mgr_.get(), true);
// periodic_trigger_thread_->delta_checkpoint_trigger_ =
// MakeShared<CheckpointPeriodicTrigger>(delta_checkpoint_interval_sec, wal_mgr_.get(), false);
// periodic_trigger_thread_->compact_segment_trigger_ =
// MakeShared<CompactSegmentPeriodicTrigger>(compact_interval, compact_processor_.get());
// periodic_trigger_thread_->optimize_index_trigger_ =
// MakeShared<OptimizeIndexPeriodicTrigger>(optimize_interval, compact_processor_.get());
// }
//
// periodic_trigger_thread_->cleanup_trigger_ =
// MakeShared<CleanupPeriodicTrigger>(cleanup_interval, bg_processor_.get(), new_catalog_.get(), txn_mgr_.get());
// bg_processor_->SetCleanupTrigger(periodic_trigger_thread_->cleanup_trigger_);
//
// if (target_mode == StorageMode::kWritable) {
// auto txn = txn_mgr_->BeginTxn(MakeUnique<String>("ForceCheckpointTask"));
// auto force_ckp_task = MakeShared<ForceCheckpointTask>(txn, true, system_start_ts);
// bg_processor_->Submit(force_ckp_task);
// force_ckp_task->Wait();
// txn->SetReaderAllowed(true);
// txn_mgr_->CommitTxn(txn);
// }
//
// periodic_trigger_thread_->Start();
//
return Status::OK();
}

void Storage::AttachCatalog(const FullCatalogFileInfo &full_ckp_info, const Vector<DeltaCatalogFileInfo> &delta_ckp_infos) {
new_catalog_ = Catalog::LoadFromFiles(full_ckp_info, delta_ckp_infos, buffer_mgr_.get());
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/storage.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import log_file;
import memindex_tracer;
import persistence_manager;
import virtual_store;
import status;

export module storage;

Expand Down Expand Up @@ -68,6 +69,7 @@ public:

StorageMode GetStorageMode() const;
void SetStorageMode(StorageMode mode);
Status SetReaderStorageContinue();

void AttachCatalog(const FullCatalogFileInfo &full_ckp_info, const Vector<DeltaCatalogFileInfo> &delta_ckp_infos);

Expand Down
5 changes: 2 additions & 3 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ namespace infinity {
Txn::Txn(TxnManager *txn_manager,
BufferManager *buffer_manager,
Catalog *catalog,
BGTaskProcessor *bg_task_processor,
TransactionID txn_id,
TxnTimeStamp begin_ts,
SharedPtr<String> txn_text)
: txn_mgr_(txn_manager), buffer_mgr_(buffer_manager), bg_task_processor_(bg_task_processor), catalog_(catalog), txn_store_(this, catalog),
: txn_mgr_(txn_manager), buffer_mgr_(buffer_manager), catalog_(catalog), txn_store_(this, catalog),
txn_id_(txn_id), txn_context_(begin_ts), wal_entry_(MakeShared<WalEntry>()), txn_delta_ops_entry_(MakeUnique<CatalogDeltaEntry>()),
txn_text_(std::move(txn_text)) {}

Expand Down Expand Up @@ -519,7 +518,7 @@ TxnTimeStamp Txn::Commit() {
txn_store_.MaintainCompactionAlg();

if (!txn_delta_ops_entry_->operations().empty()) {
txn_mgr_->AddDeltaEntry(std::move(txn_delta_ops_entry_));
InfinityContext::instance().storage()->bg_processor()->Submit(MakeShared<AddDeltaEntryTask>(std::move(txn_delta_ops_entry_)));
}

return commit_ts;
Expand Down
3 changes: 0 additions & 3 deletions src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ struct ScanParam {

class TxnManager;
struct Catalog;
class BGTaskProcessor;
struct TableEntry;
struct DBEntry;
struct BaseEntry;
Expand All @@ -70,7 +69,6 @@ public:
explicit Txn(TxnManager *txn_manager,
BufferManager *buffer_manager,
Catalog *catalog,
BGTaskProcessor *bg_task_processor,
TransactionID txn_id,
TxnTimeStamp begin_ts,
SharedPtr<String> txn_text);
Expand Down Expand Up @@ -244,7 +242,6 @@ private:
// Reference to external class
TxnManager *txn_mgr_{};
BufferManager *buffer_mgr_{}; // This BufferManager ptr Only for replaying wal
BGTaskProcessor *bg_task_processor_{};
Catalog *catalog_{};

TxnStore txn_store_; // this has this ptr, so txn cannot be moved.
Expand Down
14 changes: 2 additions & 12 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ namespace infinity {

TxnManager::TxnManager(Catalog *catalog,
BufferManager *buffer_mgr,
BGTaskProcessor *bg_task_processor,
WalManager *wal_mgr,
TransactionID start_txn_id,
TxnTimeStamp start_ts)
: catalog_(catalog), buffer_mgr_(buffer_mgr), bg_task_processor_(bg_task_processor), wal_mgr_(wal_mgr), start_ts_(start_ts), is_running_(false) {}
: catalog_(catalog), buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), start_ts_(start_ts), is_running_(false) {}

Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text, bool ckp_txn) {
// Check if the is_running_ is true
Expand All @@ -69,7 +68,7 @@ Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text, bool ckp_txn) {
}

// Create txn instance
auto new_txn = SharedPtr<Txn>(new Txn(this, buffer_mgr_, catalog_, bg_task_processor_, new_txn_id, begin_ts, std::move(txn_text)));
auto new_txn = SharedPtr<Txn>(new Txn(this, buffer_mgr_, catalog_, new_txn_id, begin_ts, std::move(txn_text)));

// Storage txn in txn manager
txn_map_[new_txn_id] = new_txn;
Expand Down Expand Up @@ -208,15 +207,6 @@ void TxnManager::SendToWAL(Txn *txn) {
}
}

void TxnManager::AddDeltaEntry(UniquePtr<CatalogDeltaEntry> delta_entry) {
// Check if the is_running_ is true
if (is_running_.load() == false) {
String error_message = "TxnManager is not running, cannot add delta entry";
UnrecoverableError(error_message);
}
bg_task_processor_->Submit(MakeShared<AddDeltaEntryTask>(std::move(delta_entry)));
}

void TxnManager::Start() {
is_running_.store(true, std::memory_order::relaxed);
LOG_INFO("TxnManager is started.");
Expand Down
7 changes: 1 addition & 6 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export class TxnManager {
public:
explicit TxnManager(Catalog *catalog,
BufferManager *buffer_mgr,
BGTaskProcessor *task_processor,
WalManager *wal_mgr,
TransactionID start_txn_id,
TxnTimeStamp start_ts);
Expand All @@ -62,8 +61,6 @@ public:

Catalog *GetCatalog() const { return catalog_; }

BGTaskProcessor *bg_task_processor() const { return bg_task_processor_; }

TxnTimeStamp GetCommitTimeStampR(Txn *txn);

TxnTimeStamp GetCommitTimeStampW(Txn *txn);
Expand All @@ -72,8 +69,6 @@ public:

void SendToWAL(Txn *txn);

void AddDeltaEntry(UniquePtr<CatalogDeltaEntry> delta_entry);

void Start();

void Stop();
Expand Down Expand Up @@ -118,7 +113,7 @@ private:
Catalog *catalog_{};
mutable std::mutex locker_{};
BufferManager *buffer_mgr_{};
BGTaskProcessor *bg_task_processor_{};

HashMap<TransactionID, SharedPtr<Txn>> txn_map_{};
WalManager *wal_mgr_;

Expand Down
24 changes: 23 additions & 1 deletion src/storage/wal/wal_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,32 @@ export enum class StorageMode {
kWritable,
};

String ToString(StorageMode storage_mode) {
switch (storage_mode) {
case StorageMode::kUnInitialized: {
return "Uninitialized";
}
case StorageMode::kAdmin: {
return "Admin";
}
case StorageMode::kReadable: {
return "Readable";
}
case StorageMode::kWritable: {
return "Writable";
}
}
}

export class WalManager {
public:
WalManager(Storage *storage, String wal_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOptionType flush_option);
WalManager(Storage *storage, String wal_dir, String data_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOptionType flush_option);
WalManager(Storage *storage,
String wal_dir,
String data_dir,
u64 wal_size_threshold,
u64 delta_checkpoint_interval_wal_bytes,
FlushOptionType flush_option);

~WalManager();

Expand Down

0 comments on commit b0e2dca

Please sign in to comment.