Skip to content

Commit

Permalink
Fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 committed Jan 7, 2025
1 parent 08de24e commit d3d2582
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
5 changes: 3 additions & 2 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,12 @@ void Txn::Rollback() {
LOG_TRACE(fmt::format("Txn: {} is dropped.", txn_context_ptr_->txn_id_));
}

void Txn::PutDeltaOps() {
SharedPtr<AddDeltaEntryTask> Txn::MakeAddDeltaEntryTask() {
if (!txn_delta_ops_entry_->operations().empty()) {
LOG_TRACE(txn_delta_ops_entry_->ToStringSimple());
InfinityContext::instance().storage()->bg_processor()->Submit(MakeShared<AddDeltaEntryTask>(std::move(txn_delta_ops_entry_)));
return MakeShared<AddDeltaEntryTask>(std::move(txn_delta_ops_entry_));
}
return nullptr;
}

void Txn::AddWalCmd(const SharedPtr<WalCmd> &cmd) {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class CatalogDeltaOperation;
class BaseTableRef;
enum class CompactStatementType;
struct SegmentIndexEntry;
struct AddDeltaEntryTask;

export class Txn : public EnableSharedFromThis<Txn> {
public:
Expand Down Expand Up @@ -101,7 +102,7 @@ public:

void Rollback();

void PutDeltaOps();
SharedPtr<AddDeltaEntryTask> MakeAddDeltaEntryTask();

// Database OPs
Status CreateDatabase(const SharedPtr<String> &db_name, ConflictType conflict_type, const SharedPtr<String> &comment);
Expand Down
9 changes: 5 additions & 4 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import wal_manager;
import defer_op;
import infinity_context;
import global_resource_usage;
import bg_task;

namespace infinity {

Expand Down Expand Up @@ -363,6 +364,7 @@ void TxnManager::CleanupTxn(Txn *txn, bool commit) {
UnrecoverableError(error_message);
}
}
SharedPtr<AddDeltaEntryTask> add_delta_entry_task = txn->MakeAddDeltaEntryTask();
{
// cleanup the txn from committing_txn and txm_map
auto commit_ts = txn->CommitTS();
Expand All @@ -381,13 +383,12 @@ void TxnManager::CleanupTxn(Txn *txn, bool commit) {
String error_message = fmt::format("Txn: {} not found in txn map", txn_id);
UnrecoverableError(error_message);
}

if (committing_txns_.empty() || committing_txns_.begin()->first > commit_ts) {
max_committed_ts_ = commit_ts;
}
if (commit) {
txn->PutDeltaOps();
}
}
if (commit && add_delta_entry_task) {
InfinityContext::instance().storage()->bg_processor()->Submit(std::move(add_delta_entry_task));
}
break;
}
Expand Down

0 comments on commit d3d2582

Please sign in to comment.