Skip to content

Commit

Permalink
Add merge local catalog delta ops to global catalog delta entry method (
Browse files Browse the repository at this point in the history
infiniflow#455)

* Refactor constructors and add transaction handlers in storage code

The commit modifies the Txn constructor to include buffer manager and background task processor. Additionally, instances of 'catalog_delta_entry' are now created only when there are operations, reducing redundancy. These changes make the constructor more organized, efficient and improve transaction handling.

* Refactor content and input changes in transaction handlers

This revision introduces significant adjustments in transaction handlers for the application. Constructs for 'CatalogDeltaOperation' and 'CatalogDeltaEntry' have been streamlined and erroneous type returns fixed. An explicit constructor for 'CatalogDeltaEntry' has been removed, minimizing redundancy while reducing the need to create instances when no operations are in queue. The changes ensure efficient transaction handling and organization in code structure.

* Optimize transaction types and operations

The commit focuses on refining the constructs of CatalogDeltaOperation and CatalogDeltaEntry in the transaction handlers. An undue CatalogDeltaEntry constructor has been eliminated leading to better performance when there are no operations in the queue. This revamp of transaction handling provides a more efficient and organized code structure.

* Refactor CatalogDeltaEntry merge operations

The Merge method has been moved from CatalogDeltaEntry to GlobalCatalogDeltaEntry and changed to take unique pointers instead of shared pointers, which increases efficiency. Additionally, some CatalogDeltaEntry objects were changed from shared pointers to unique pointers, optimizing memory usage and ownership. This commit provides a more efficient transaction handling process and optimized memory usage.

* Refactor string formatting and logging in CatalogDeltaEntry

Removed unused includes and updated string formatting in catalog_delta_entry.cppm. Changed logging messages and method of producing strings from string concatenation to string stream in catalog_delta_entry.cpp for improved efficiency and readability. Additionally, updated usage of underscore "_" to hash "#" in EncodeIndex methods, providing better consistency in encoding index patterns.

---------

Co-authored-by: Jin Hai <[email protected]>
  • Loading branch information
loloxwg and JinHai-CN authored Jan 22, 2024
1 parent 2ecd55d commit 6c9b45e
Show file tree
Hide file tree
Showing 25 changed files with 363 additions and 103 deletions.
10 changes: 9 additions & 1 deletion src/storage/background_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import logger;
import blocking_queue;
import infinity_exception;
import wal;
import third_party;

namespace infinity {

Expand Down Expand Up @@ -51,14 +52,21 @@ void BGTaskProcessor::Process() {
break;
}
case BGTaskType::kForceCheckpoint: {
ForceCheckpointTask *force_ckp_task = (ForceCheckpointTask *)(bg_task.get());
ForceCheckpointTask *force_ckp_task = static_cast<ForceCheckpointTask *>(bg_task.get());
wal_manager_->Checkpoint(force_ckp_task);
break;
}
case BGTaskType::kStopProcessor: {
running = false;
break;
}
case BGTaskType::kCatalogDeltaOpsMerge: {
CatalogDeltaOpsMergeTask *task = static_cast<CatalogDeltaOpsMergeTask *>(bg_task.get());
auto &local_catalog_ops = task->local_catalog_delta_entry_;
auto *catalog = task->catalog_;
catalog->global_catalog_delta_entry_->Merge(std::move(local_catalog_ops));
break;
}
case BGTaskType::kInvalid: {
UnrecoverableError("Invalid background task");
break;
Expand Down
16 changes: 16 additions & 0 deletions src/storage/bg_task.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ module;

import stl;
import txn;
import catalog;
import catalog_delta_entry;

export module bg_task;

Expand All @@ -25,6 +27,7 @@ export enum class BGTaskType {
kTryCheckpoint, // Periodically triggered by timer
kForceCheckpoint, // Manually triggered by PhysicalImport
kStopProcessor,
kCatalogDeltaOpsMerge, // Merge
kInvalid
};

Expand Down Expand Up @@ -84,4 +87,17 @@ export struct ForceCheckpointTask final : public BGTask {
Txn *txn_{};
};

export struct CatalogDeltaOpsMergeTask final : public BGTask {

explicit CatalogDeltaOpsMergeTask(UniquePtr<CatalogDeltaEntry> local_catalog_delta_entry, NewCatalog *catalog)
: BGTask(BGTaskType::kCatalogDeltaOpsMerge, true), local_catalog_delta_entry_(std::move(local_catalog_delta_entry)), catalog_(catalog) {}

~CatalogDeltaOpsMergeTask() = default;

String ToString() const final { return "Catalog delta operation merge task"; }

UniquePtr<CatalogDeltaEntry> local_catalog_delta_entry_{};
NewCatalog *catalog_{};
};

} // namespace infinity
2 changes: 1 addition & 1 deletion src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import table_detail;
import index_def;
import txn_store;
import data_access_state;
import wal;
import catalog_delta_entry;

namespace infinity {

Expand Down
6 changes: 3 additions & 3 deletions src/storage/meta/catalog.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public:
}
};

class CatalogDeltaOperation;
class GlobalCatalogDeltaEntry;
class CatalogDeltaEntry;
export struct NewCatalog {
public:
explicit NewCatalog(SharedPtr<String> dir, bool create_default_db = false);
Expand Down Expand Up @@ -249,8 +250,7 @@ public:

ProfileHistory history{DEFAULT_PROFILER_HISTORY_SIZE};

// Global physical wal log
SharedPtr<Vector<SharedPtr<CatalogDeltaOperation>>> global_catalog_delta_ops_{};
UniquePtr<GlobalCatalogDeltaEntry> global_catalog_delta_entry_{MakeUnique<GlobalCatalogDeltaEntry>()};
};

} // namespace infinity
2 changes: 1 addition & 1 deletion src/storage/meta/db_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import third_party;
import buffer_manager;
import txn_state;
import status;
import wal;
import catalog_delta_entry;
import infinity_exception;

namespace infinity {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import infinity_exception;
import varchar_layout;
import logger;
import data_file_worker;
import wal;
import catalog_delta_entry;

namespace infinity {

Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/block_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import third_party;
import defer_op;
import local_file_system;
import serialize;
import wal;
import catalog_delta_entry;

import infinity_exception;
import parser;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/column_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import buffer_manager;
import infinity_exception;
import index_file_worker;
import parser;
import wal;
import catalog_delta_entry;
import annivfflat_index_file_worker;
import hnsw_file_worker;
import logger;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/db_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import logger;
import third_party;
import infinity_exception;
import status;
import wal;
import catalog_delta_entry;

namespace infinity {

Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/irs_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import iresearch_datastore;
import index_base;
import index_full_text;
import logger;
import wal;
import catalog_delta_entry;

namespace infinity {

Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_column_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import dist_func_ip;
import hnsw_alg;
import lvq_store;
import plain_store;
import wal;
import catalog_delta_entry;

namespace infinity {
SegmentColumnIndexEntry::SegmentColumnIndexEntry(ColumnIndexEntry *column_index_entry, SegmentID segment_id, BufferObj *buffer)
Expand Down
22 changes: 11 additions & 11 deletions src/storage/meta/entry/segment_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import hnsw_alg;
import lvq_store;
import plain_store;
import segment_iter;
import wal;
import catalog_delta_entry;

namespace infinity {

Expand Down Expand Up @@ -386,17 +386,17 @@ nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts, bool is_full_
}
}
for (BlockEntry *block_entry : block_entries) {
LOG_TRACE(fmt::format("Before Flush: block_entry checkpoint ts: {}, min_row_ts: {}, max_row_ts: {} || max_commit_ts: {}",
block_entry->checkpoint_ts(),
block_entry->min_row_ts(),
block_entry->max_row_ts(),
max_commit_ts));
// LOG_TRACE(fmt::format("Before Flush: block_entry checkpoint ts: {}, min_row_ts: {}, max_row_ts: {} || max_commit_ts: {}",
// block_entry->checkpoint_ts(),
// block_entry->min_row_ts(),
// block_entry->max_row_ts(),
// max_commit_ts));
block_entry->Flush(max_commit_ts);
LOG_TRACE(fmt::format("Finish Flush: block_entry checkpoint ts: {}, min_row_ts: {}, max_row_ts: {} || max_commit_ts: {}",
block_entry->checkpoint_ts(),
block_entry->min_row_ts(),
block_entry->max_row_ts(),
max_commit_ts));
// LOG_TRACE(fmt::format("Finish Flush: block_entry checkpoint ts: {}, min_row_ts: {}, max_row_ts: {} || max_commit_ts: {}",
// block_entry->checkpoint_ts(),
// block_entry->min_row_ts(),
// block_entry->max_row_ts(),
// max_commit_ts));
// WARNING: this operation may influence data visibility
// if (!is_full_checkpoint && block_entry->checkpoint_ts_ != max_commit_ts) {
// continue;
Expand Down
3 changes: 1 addition & 2 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import txn_manager;
import iresearch_datastore;
import index_base;
import index_full_text;
import wal;
import catalog_delta_entry;

namespace infinity {

Expand Down Expand Up @@ -100,7 +100,6 @@ Tuple<TableIndexEntry *, Status> TableEntry::CreateIndex(const SharedPtr<IndexDe
{ //
if (txn_mgr != nullptr) {
auto operation = MakeUnique<AddIndexMetaOperation>(new_table_index_meta.get());
LOG_TRACE(fmt::format("Add new AddDatabaseMeta Operation: {}", operation->ToString()));
txn_mgr->GetTxn(txn_id)->AddCatalogDeltaOperation(std::move(operation));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/entry/table_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import index_base;
import parser;
import infinity_exception;
import index_full_text;
import wal;
import catalog_delta_entry;

namespace infinity {

Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/table_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import third_party;
import txn_state;
import txn_manager;
import buffer_manager;
import wal;
import catalog_delta_entry;

import third_party;
import status;
Expand Down
9 changes: 6 additions & 3 deletions src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,21 @@ void Storage::Init() {
// Must init catalog before txn manager.
// Replay wal file wrap init catalog
i64 system_start_ts = wal_mgr_->ReplayWalFile();
++ system_start_ts;
++system_start_ts;

bg_processor_ = MakeUnique<BGTaskProcessor>(wal_mgr_.get());
// Construct txn manager
txn_mgr_ = MakeUnique<TxnManager>(new_catalog_.get(),
buffer_mgr_.get(),
bg_processor_.get(),
std::bind(&WalManager::PutEntry, wal_mgr_.get(), std::placeholders::_1),
0,
system_start_ts);

txn_mgr_->Start();
// start WalManager after TxnManager since it depends on TxnManager.
wal_mgr_->Start();

bg_processor_ = MakeUnique<BGTaskProcessor>(wal_mgr_.get());
bg_processor_->Start();

BuiltinFunctions builtin_functions(new_catalog_);
Expand All @@ -87,6 +89,7 @@ void Storage::UnInit() {
wal_mgr_->Stop();

txn_mgr_.reset();
bg_processor_.reset();
wal_mgr_.reset();

// Buffer Manager need to be destroyed before catalog. since buffer manage hold the raw pointer owned by catalog:
Expand Down Expand Up @@ -130,7 +133,7 @@ void Storage::InitCatalog(NewCatalog *, TxnManager *txn_mgr) {
Txn *new_txn = txn_mgr->CreateTxn();
new_txn->Begin();
Status status = new_txn->CreateDatabase("default", ConflictType::kError);
if(status.ok()) {
if (status.ok()) {
txn_mgr->CommitTxn(new_txn);
} else {
txn_mgr->RollBackTxn(new_txn);
Expand Down
22 changes: 15 additions & 7 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
module;

#include <string>
#include <vector>
#include <tuple>
#include <vector>

module txn;

import stl;

import infinity_exception;

import txn_manager;
import buffer_manager;
import wal;
import third_party;
import logger;
Expand All @@ -33,7 +33,6 @@ import txn_store;
import txn_state;
import parser;
import meta_state;
import buffer_manager;
import data_access_state;
import status;
import table_detail;
Expand All @@ -43,16 +42,19 @@ import database_detail;
import status;
import table_def;
import index_def;
import catalog_delta_entry;
import bg_task;
import backgroud_process;

namespace infinity {

Txn::Txn(TxnManager *txn_mgr, NewCatalog *catalog, TransactionID txn_id)
: txn_mgr_(txn_mgr), catalog_(catalog), txn_id_(txn_id), wal_entry_(MakeShared<WalEntry>()),
local_catalog_delta_ops_entry_(MakeShared<CatalogDeltaEntry>()) {}
Txn::Txn(TxnManager *txn_manager, BufferManager *buffer_manager, NewCatalog *catalog, BGTaskProcessor *bg_task_processor, TransactionID txn_id)
: txn_mgr_(txn_manager), buffer_mgr_(buffer_manager), bg_task_processor_(bg_task_processor), catalog_(catalog), txn_id_(txn_id),
wal_entry_(MakeShared<WalEntry>()), local_catalog_delta_ops_entry_(MakeUnique<CatalogDeltaEntry>()) {}

Txn::Txn(BufferManager *buffer_mgr, TxnManager *txn_mgr, NewCatalog *catalog, TransactionID txn_id)
: txn_mgr_(txn_mgr), buffer_mgr_(buffer_mgr), catalog_(catalog), txn_id_(txn_id), wal_entry_(MakeShared<WalEntry>()),
local_catalog_delta_ops_entry_(MakeShared<CatalogDeltaEntry>()) {}
local_catalog_delta_ops_entry_(MakeUnique<CatalogDeltaEntry>()) {}

UniquePtr<Txn> Txn::NewReplayTxn(BufferManager *buffer_mgr, TxnManager *txn_mgr, NewCatalog *catalog, TransactionID txn_id) {
auto txn = MakeUnique<Txn>(buffer_mgr, txn_mgr, catalog, txn_id);
Expand Down Expand Up @@ -473,6 +475,12 @@ void Txn::CommitBottom() {
// Snapshot the physical operations in one txn
local_catalog_delta_ops_entry_->SaveState(txn_id_, commit_ts);

if (!local_catalog_delta_ops_entry_->operations().empty()) {
// Don't need to write empty CatalogDeltaEntry (read-only transactions).
auto catalog_delta_ops_merge_task = MakeShared<CatalogDeltaOpsMergeTask>(std::move(local_catalog_delta_ops_entry_), catalog_);
bg_task_processor_->Submit(catalog_delta_ops_merge_task);
}

LOG_INFO(fmt::format("Txn: {} is committed.", txn_id_));

// Notify the top half
Expand Down
13 changes: 9 additions & 4 deletions src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct ScanParam {

class TxnManager;
struct NewCatalog;
class BGTaskProcessor;
struct TableEntry;
struct DBEntry;
struct BaseEntry;
Expand All @@ -57,7 +58,11 @@ class CatalogDeltaOperation;

export class Txn {
public:
explicit Txn(TxnManager *txn_mgr, NewCatalog *catalog, TransactionID txn_id);
explicit Txn(TxnManager *txn_manager,
BufferManager *buffer_manager,
NewCatalog *catalog,
BGTaskProcessor *bg_task_processor,
TransactionID txn_id);

explicit Txn(BufferManager *buffer_mgr, TxnManager *txn_mgr, NewCatalog *catalog, TransactionID txn_id);

Expand Down Expand Up @@ -156,10 +161,10 @@ public:
void Checkpoint(const TxnTimeStamp max_commit_ts, bool is_full_checkpoint);

private:
// Txn Manager
TxnManager *txn_mgr_{};
// This BufferManager ptr Only for replaying wal
BufferManager *buffer_mgr_{};
BGTaskProcessor *bg_task_processor_{};
NewCatalog *catalog_{};
TransactionID txn_id_{};

Expand All @@ -184,8 +189,8 @@ private:
/// LOG
// WalEntry
SharedPtr<WalEntry> wal_entry_{};
// Physical log entry
SharedPtr<CatalogDeltaEntry> local_catalog_delta_ops_entry_{};

UniquePtr<CatalogDeltaEntry> local_catalog_delta_ops_entry_{};

// WalManager notify the commit bottom half is done
std::mutex lock_{};
Expand Down
Loading

0 comments on commit 6c9b45e

Please sign in to comment.